Benchmarking
Below, the logic of the three static benchmarks is presented, along with the linear optimization model. On github, fully implemented benchmarking notebooks are uploaded for each benchmark.
Uncontrolled charging
Every time step, the maximum charging signal is sent so that the battery is immediately charged upon arrival.
for i in range(episode_length * timesteps_per_hour * n_episodes):
if dumb_norm_vec_env.env_method("is_done")[0]:
dumb_norm_vec_env.reset()
dumb_norm_vec_env.step([np.ones(n_evs)])
Distributed charging
In the distributed charging strategy, the charging process is spread out across the entire stay,
and the battery only reaches full charge before arrival. The method get_dist_factor
is used to
get the laxity / distribution factor for each time step: time needed / time left.
for i in range(episode_length * timesteps_per_hour * n_episodes):
if dist_norm_vec_env.env_method("is_done")[0]:
dist_norm_vec_env.reset()
dist_norm_vec_env.step(([np.clip(np.multiply(np.ones(n_evs), dist_norm_vec_env.env_method("get_dist_factor")[0]),0,1)]))
Night charging
Night charging preferably starts charging at midnight, but begins earlier if the battery requires a longer charging time. The earliest departure time globally is considered to ensure a full battery in the worst case scenario. During the day between 11 and 14, a charging signal is sent to the battery to accommodate for the lunch break in the caretaker use-case.
df = env.db
df_leaving_home = df[(df['Location'].shift() == 'home') & (df['Location'] == 'driving')]
earliest_dep_time = df_leaving_home['date'].dt.time.min()
day_of_earliest_dep = df_leaving_home[df_leaving_home['date'].dt.time == earliest_dep_time]['date'].min()
earliest_dep = earliest_dep_time.hour + earliest_dep_time.minute/60
evse = env.load_calculation.evse_max_power
cap = env.ev_conf.init_battery_cap
target_soc = env.ev_conf.target_soc
eff = env.ev_conf.charging_eff
max_time_needed = target_soc * cap / eff / evse # time needed to charge to target soc from 0
difference = earliest_dep - max_time_needed
starting_time = (24 + difference)
if starting_time > 24:
starting_time = 23.99 # always start just before midnight
charging_hour = int(math.modf(starting_time)[1])
minutes = np.asarray([0, 15, 30, 45])
# split number and decimals, use decimals and choose the closest minute
closest_index = np.abs(minutes - int(math.modf(starting_time)[0]*60)).argmin()
charging_minute = minutes[closest_index]
episode_length = n_steps
n_episodes = n_episodes
night_norm_vec_env.reset()
charging=False
for i in range(episode_length * timesteps_per_hour * n_episodes):
if night_norm_vec_env.env_method("is_done")[0]:
night_norm_vec_env.reset()
time: pd.Timestamp = night_norm_vec_env.env_method("get_time")[0]
if ((time.hour >= 11) and (time.hour <= 14)) and (use_case=="ct"):
night_norm_vec_env.step(([np.clip(np.multiply(np.ones(n_evs), night_norm_vec_env.env_method("get_dist_factor")[0]),0,1)]))
continue
time: pd.Timestamp = night_norm_vec_env.env_method("get_time")[0]
if (((charging_hour <= time.hour) and (charging_minute <= time.minute)) or (charging)):
if not charging:
charging_start: pd.Timestamp = copy(time)
charging=True
night_norm_vec_env.step([np.ones(n_evs)])
else:
night_norm_vec_env.step([np.zeros(n_evs)])
if charging and ((time - charging_start).total_seconds()/3600 > int(max_time_needed)):
charging = False
Linear optimization model
The linear optimization model does 2 things: first, it calculates an optimal result, based on the available information of the FleetRL use-case (building load, vehicle schedules, etc.). Its objective is cost minimization and its variables are the actions in the range of [0,1] -> identical to the RL agents. Once a result has been obtained, it is run on a FleetRL environment -> the linear agent steps through the env and generates the log file that saves charging cost, SoH, violations, etc.
# importing necessary libraries
import datetime as dt
import numpy as np
import math
import matplotlib.pyplot as plt
from typing import Literal
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3 import PPO, TD3
from stable_baselines3.common.vec_env import VecNormalize, SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
from FleetRL.fleet_env.fleet_environment import FleetEnv
import pandas as pd
import pyomo.environ as pyo
# wrapper for parallelization
if __name__ == "__main__":
# define parameters here for easier change
n_steps = 8600
n_episodes = 1
n_evs = 5
n_envs = 1
time_steps_per_hour = 4
use_case: str = "lmd" # for file name
scenario: Literal["arb", "real"] = "real"
# environment arguments
env_kwargs = {"schedule_name": "5_lmd_eval.csv",
"building_name": "load_lmd.csv",
"use_case": "lmd",
"include_building": True,
"include_pv": True,
"time_picker": "static",
"deg_emp": False,
"include_price": True,
"ignore_price_reward": False,
"ignore_invalid_penalty": True,
"ignore_overcharging_penalty": True,
"ignore_overloading_penalty": False,
"episode_length": n_steps,
"normalize_in_env": False,
"verbose": 0,
"aux": True,
"log_data": True,
"calculate_degradation": True
}
# adapting price information according to user input
if scenario == "real":
env_kwargs["spot_markup"] = 10
env_kwargs["spot_mul"] = 1.5
env_kwargs["feed_in_ded"] = 0.25
env_kwargs["price_name"] = "spot_2021_new_tariff.csv"
env_kwargs["tariff_name"] = "fixed_feed_in.csv"
elif scenario == "arb":
env_kwargs["spot_markup"] = 0
env_kwargs["spot_mul"] = 1
env_kwargs["feed_in_ded"] = 0
env_kwargs["price_name"] = "spot_2021_new.csv"
env_kwargs["tariff_name"] = "spot_2021_new.csv"
# creating vec_env for linear agent
lin_vec_env = make_vec_env(FleetEnv,
n_envs=n_envs,
vec_env_cls=SubprocVecEnv,
env_kwargs=env_kwargs)
# normalization
lin_norm_vec_env = VecNormalize(venv=lin_vec_env,
norm_obs=True,
norm_reward=True,
training=True,
clip_reward=10.0)
# creating an env object for accessing information in the pyomo model
env = FleetEnv(use_case=use_case,
schedule_name=env_kwargs["schedule_name"],
tariff_name=env_kwargs["tariff_name"],
price_name=env_kwargs["price_name"],
episode_length=n_steps,
time_picker_name=env_kwargs["time_picker"],
building_name=env_kwargs["building_name"],
spot_markup=env_kwargs["spot_markup"],
spot_mul=env_kwargs["spot_mul"],
feed_in_ded=env_kwargs["feed_in_ded"])
# reading the input file as a pandas DataFrame
df: pd.DataFrame = env.db
# Extracting information from the df
ev_data = [df.loc[df["ID"]==i, "There"] for i in range(n_evs)]
building_data = df["load"] # building load in kW
# length of time, load, and pv series (with multiple EVs, this is the index)
length_time_load_pv = 8760 * 4
# price and tariff data
price_data = np.multiply(np.add(df["DELU"], env.ev_conf.fixed_markup), env.ev_conf.variable_multiplier) / 1000
tariff_data = np.multiply(df["tariff"], 1-env.ev_conf.feed_in_deduction) / 1000
# pv data
pv_data = df["pv"] # pv power in kW
# soc on return, separately for each EV
soc_on_return = [df.loc[df["ID"]==i, "SOC_on_return"] for i in range(n_evs)]
# further model parameters
battery_capacity = env.ev_conf.init_battery_cap # EV batt size in kWh
p_trafo = env.load_calculation.grid_connection # Transformer rating in kW
charging_eff = env.ev_conf.charging_eff # charging losses
discharging_eff = env.ev_conf.discharging_eff # discharging losses
init_soc = env.ev_conf.def_soc # init SoC
evse_max_power = env.load_calculation.evse_max_power # kW, max rating of the charger
# create pyomo model
model = pyo.ConcreteModel(name="sc_pyomo")
# create sets for the pyomo optimization
model.timestep = pyo.Set(initialize=range(length_time_load_pv))
model.time_batt = pyo.Set(initialize=range(0, length_time_load_pv+1))
model.ev_id = pyo.Set(initialize=range(n_evs))
# model parameters, i,j used in case multiple EVs exist
model.building_load = pyo.Param(model.timestep, initialize={i: building_data[i] for i in range(length_time_load_pv)})
model.pv = pyo.Param(model.timestep, initialize={i: pv_data[i] for i in range(length_time_load_pv)})
model.ev_availability = pyo.Param(model.timestep, model.ev_id, initialize={(i, j): ev_data[j].iloc[i] for i in range(length_time_load_pv) for j in range(n_evs)})
model.soc_on_return = pyo.Param(model.timestep, model.ev_id, initialize={(i, j): soc_on_return[j].iloc[i] for i in range(length_time_load_pv) for j in range(n_evs)})
model.price = pyo.Param(model.timestep, initialize={i: price_data[i] for i in range(length_time_load_pv)})
model.tariff = pyo.Param(model.timestep, initialize={i: tariff_data[i] for i in range(length_time_load_pv)})
# decision variables
# this assumes only charging, I could also make bidirectional later
model.soc = pyo.Var(model.time_batt, model.ev_id, bounds=(0, env.ev_conf.target_soc))
model.charging_signal = pyo.Var(model.timestep, model.ev_id, within=pyo.NonNegativeReals, bounds=(0,1))
model.discharging_signal = pyo.Var(model.timestep, model.ev_id, within=pyo.NonPositiveReals, bounds=(-1,0))
model.positive_action = pyo.Var(model.timestep, model.ev_id, within=pyo.Binary)
model.used_pv = pyo.Var(model.timestep, model.ev_id, within=pyo.NonNegativeReals)
# constraints
def grid_limit(m, i, ev):
return ((m.charging_signal[i, ev] + m.discharging_signal[i, ev]) * evse_max_power
+ m.building_load[i] - m.pv[i] <= p_trafo)
# charging an discharging cannot occur at the same time, big M method
def mutual_exclusivity_charging(m, i, ev):
return m.charging_signal[i, ev] <= m.positive_action[i, ev]
def mutual_exclusivity_discharging(m, i, ev):
return m.discharging_signal[i, ev] >= (m.positive_action[i, ev] - 1)
# PV prioritized over grid
def pv_use(m, i, ev):
return m.used_pv[i, ev] <= m.charging_signal[i, ev] * evse_max_power
# only use as much PV as available
def pv_avail(m, i, ev):
return m.used_pv[i, ev] <= m.pv[i] / n_evs
def no_charge_when_no_car(m, i, ev):
if m.ev_availability[i, ev] == 0:
return m.charging_signal[i, ev] == 0
else:
return pyo.Constraint.Feasible
def no_discharge_when_no_car(m, i, ev):
if m.ev_availability[i, ev] == 0:
return m.discharging_signal[i, ev] == 0
else:
return pyo.Constraint.Feasible
def soc_rules(m, i, ev):
#last time step
if i == length_time_load_pv-1:
return (m.soc[i+1, ev]
== m.soc[i, ev] + (m.charging_signal[i, ev]*charging_eff + m.discharging_signal[i, ev])
* evse_max_power * 1 / time_steps_per_hour / battery_capacity)
# new arrival
elif (m.ev_availability[i, ev] == 0) and (m.ev_availability[i+1, ev] == 1):
return m.soc[i+1, ev] == m.soc_on_return[i+1, ev]
# departure in next time step
elif (m.ev_availability[i, ev] == 1) and (m.ev_availability[i+1, ev] == 0):
return m.soc[i, ev] == env.ev_conf.target_soc
else:
return pyo.Constraint.Feasible
def charging_dynamics(m, i, ev):
#last time step
if i == length_time_load_pv-1:
return (m.soc[i+1, ev]
== m.soc[i, ev] + (m.charging_signal[i, ev]*charging_eff + m.discharging_signal[i, ev])
* evse_max_power * 1 / time_steps_per_hour / battery_capacity)
# charging
if (m.ev_availability[i, ev] == 1) and (m.ev_availability[i+1, ev] == 1):
return (m.soc[i+1, ev]
== m.soc[i, ev] + (m.charging_signal[i, ev]*charging_eff + m.discharging_signal[i, ev])
* evse_max_power * 1 / time_steps_per_hour / battery_capacity)
elif (m.ev_availability[i, ev] == 1) and (m.ev_availability[i+1, ev] == 0):
return m.soc[i+1, ev] == 0
elif m.ev_availability[i, ev] == 0:
return m.soc[i, ev] == 0
else:
return pyo.Constraint.Feasible
def max_charging_limit(m, i, ev):
return m.charging_signal[i, ev]*evse_max_power <= evse_max_power * m.ev_availability[i, ev]
def max_discharging_limit(m, i, ev):
return m.discharging_signal[i, ev]*evse_max_power*-1 <= evse_max_power * m.ev_availability[i, ev]
def first_soc(m, i, ev):
return m.soc[0, ev] == init_soc
def no_departure_abuse(m, i, ev):
if i == length_time_load_pv - 1:
return pyo.Constraint.Feasible
if (m.ev_availability[i, ev] == 0) and (m.ev_availability[i-1, ev]) == 1:
return m.discharging_signal[i, ev] == 0
elif (m.ev_availability[i, ev] == 1) and (m.ev_availability[i+1, ev]) == 0:
return m.discharging_signal[i, ev] == 0
else:
return pyo.Constraint.Feasible
# constraints
model.cs1 = pyo.Constraint(model.timestep, model.ev_id, rule=first_soc)
model.cs2 = pyo.Constraint(model.timestep, model.ev_id, rule=grid_limit)
model.cs3 = pyo.Constraint(model.timestep, model.ev_id, rule=max_charging_limit)
model.cs4 = pyo.Constraint(model.timestep, model.ev_id, rule=max_discharging_limit)
model.cs5 = pyo.Constraint(model.timestep, model.ev_id, rule=soc_rules)
model.cs6 = pyo.Constraint(model.timestep, model.ev_id, rule=charging_dynamics)
model.cs8 = pyo.Constraint(model.timestep, model.ev_id, rule=mutual_exclusivity_charging)
model.cs9 = pyo.Constraint(model.timestep, model.ev_id, rule=mutual_exclusivity_discharging)
model.cs10 = pyo.Constraint(model.timestep, model.ev_id, rule=no_charge_when_no_car)
model.cs11 = pyo.Constraint(model.timestep, model.ev_id, rule=no_discharge_when_no_car)
model.cs12 = pyo.Constraint(model.timestep, model.ev_id, rule=pv_use)
model.cs13 = pyo.Constraint(model.timestep, model.ev_id, rule=pv_avail)
timestep_set = pyo.RangeSet(0, length_time_load_pv-1)
# cost minimization
def obj_fun(m):
return (sum([((m.charging_signal[i, ev] * evse_max_power - m.used_pv[i, ev]) / time_steps_per_hour) * m.price[i] +
((m.discharging_signal[i, ev] * evse_max_power * discharging_eff) / time_steps_per_hour) * m.tariff[i]
for i in m.timestep for ev in range(n_evs)]))
# change solver here to glpk if gurobi not configured
model.obj = pyo.Objective(rule=obj_fun, sense=pyo.minimize)
opt = pyo.SolverFactory('gurobi')#, executable="/home/enzo/Downloads/gurobi10.0.2_linux64/gurobi1002/linux64/")
# for quicker solving
opt.options['mipgap'] = 0.005
# print additional information
res = opt.solve(model, tee=True)
print(res)
# extract actions array for each time step, this is the result of the optimization
actions = [np.array([model.charging_signal[i,j].value + model.discharging_signal[i,j].value for j in range(n_evs)]) for i in range(length_time_load_pv)]
actions = pd.DataFrame({"action": actions})
# set the same index as for the RL agent and the other benchmarks
actions.index = pd.date_range(start="2020-01-01 00:00", end="2020-12-30 23:59", freq="15T")
actions["hid"] = actions.index.hour + actions.index.minute/60
# plot the resulting action curve from the pyomo optimization
len_day = 24*4
action_plot = []
for i in range(len_day):
action_plot.append(actions.groupby("hid").mean()["action"].reset_index(drop=True)[i].mean())
plt.plot(action_plot)
plt.show()
# feed the resulting actions into the FleetRL environment to get log data and KPIs
lin_norm_vec_env.reset()
start_time = lin_norm_vec_env.env_method("get_start_time")[0]
end_time = pd.to_datetime(start_time) + dt.timedelta(hours=n_steps)
env_actions = actions.loc[(actions.index >= start_time) & (actions.index <= end_time), "action"].reset_index(drop=True)
for i in range(n_steps*time_steps_per_hour):
lin_norm_vec_env.step([np.multiply(np.ones(n_evs), env_actions[i])])
# get log from environment
lin_log: pd.DataFrame = lin_norm_vec_env.env_method("get_log")[0]
# plotting analog to RL evaluation
lin_log.reset_index(drop=True, inplace=True)
lin_log = lin_log.iloc[0:-2]
real_power_lin = []
for i in range(lin_log.__len__()):
lin_log.loc[i, "hour_id"] = (lin_log.loc[i, "Time"].hour + lin_log.loc[i, "Time"].minute / 60)
mean_per_hid_lin = lin_log.groupby("hour_id").mean()["Charging energy"].reset_index(drop=True)
mean_all_lin = []
for i in range(mean_per_hid_lin.__len__()):
mean_all_lin.append(np.mean(mean_per_hid_lin[i]))
mean = pd.DataFrame()
mean["Distributed charging"] = np.multiply(mean_all_lin, 4)
mean.plot()
plt.xticks([0,8,16,24,32,40,48,56,64,72,80,88]
,["00:00","02:00","04:00","06:00","08:00","10:00","12:00","14:00","16:00","18:00","20:00","22:00"],
rotation=45)
plt.legend()
plt.grid(alpha=0.2)
plt.ylabel("Charging power in kW")
max = lin_log.loc[0, "Observation"][-10]
plt.ylim([-max * 1.2, max * 1.2])
plt.show()
# save log as pickle
lin_log.to_pickle("lin_log.pickle")