Source code for fleetrl.benchmarking.night_charging

import math
from copy import copy

from fleetrl.fleet_env.fleet_environment import FleetEnv
from fleetrl.benchmarking.benchmark import Benchmark

from stable_baselines3.common.vec_env import SubprocVecEnv, VecNormalize
from stable_baselines3.common.env_util import make_vec_env

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

[docs] class NightCharging(Benchmark): def __init__(self, n_steps: int, n_evs: int, n_episodes: int = 1, n_envs: int = 1, time_steps_per_hour: int = 4): self.n_steps = n_steps self.n_evs = n_evs self.n_episodes = n_episodes self.n_envs = n_envs self.time_steps_per_hour = time_steps_per_hour self.env_config = None
[docs] def run_benchmark(self, use_case: str, env_kwargs: dict, seed: int = None ) -> pd.DataFrame: night_vec_env = make_vec_env(FleetEnv, n_envs=self.n_envs, vec_env_cls=SubprocVecEnv, env_kwargs=env_kwargs, seed=seed) night_norm_vec_env = VecNormalize(venv=night_vec_env, norm_obs=True, norm_reward=True, training=True, clip_reward=10.0) env_config = env_kwargs["env_config"] env = FleetEnv(env_config) df = env.db df_leaving_home = df[(df['Location'].shift() == 'home') & (df['Location'] == 'driving')] earliest_dep_time = df_leaving_home['date'].dt.time.min() day_of_earliest_dep = df_leaving_home[df_leaving_home['date'].dt.time == earliest_dep_time]['date'].min() earliest_dep = earliest_dep_time.hour + earliest_dep_time.minute / 60 evse = env.load_calculation.evse_max_power cap = env.ev_config.init_battery_cap target_soc = env.ev_config.target_soc eff = env.ev_config.charging_eff max_time_needed = target_soc * cap / eff / evse # time needed to charge to target soc from 0 difference = earliest_dep - max_time_needed starting_time = (24 + difference) if starting_time > 24: starting_time = 23.99 # always start just before midnight charging_hour = int(math.modf(starting_time)[1]) minutes = np.asarray([0, 15, 30, 45]) # split number and decimals, use decimals and choose the closest minute closest_index = np.abs(minutes - int(math.modf(starting_time)[0] * 60)).argmin() charging_minute = minutes[closest_index] episode_length = self.n_steps n_episodes = self.n_episodes night_norm_vec_env.reset() charging = False for i in range(episode_length * self.time_steps_per_hour * n_episodes): if night_norm_vec_env.env_method("is_done")[0]: night_norm_vec_env.reset() time: pd.Timestamp = night_norm_vec_env.env_method("get_time")[0] if ((time.hour >= 11) and (time.hour <= 14)) and (use_case == "ct"): night_norm_vec_env.step( ([np.clip(np.multiply(np.ones(self.n_evs), night_norm_vec_env.env_method("get_dist_factor")[0]), 0, 1)])) continue time: pd.Timestamp = night_norm_vec_env.env_method("get_time")[0] if (((charging_hour <= time.hour) and (charging_minute <= time.minute)) or (charging)): if not charging: charging_start: pd.Timestamp = copy(time) charging = True night_norm_vec_env.step([np.ones(self.n_evs)]) else: night_norm_vec_env.step([np.zeros(self.n_evs)]) if charging and ((time - charging_start).total_seconds() / 3600 > int(max_time_needed)): charging = False night_log: pd.DataFrame = night_norm_vec_env.env_method("get_log")[0] night_log.reset_index(drop=True, inplace=True) night_log = night_log.iloc[0:-2] self.env_config = env_kwargs["env_config"] return night_log
[docs] def plot_benchmark(self, night_log: pd.DataFrame, ) -> None: night_log["hour_id"] = (night_log["Time"].dt.hour + night_log["Time"].dt.minute / 60) mean_per_hid_night = night_log.groupby("hour_id").mean()["Charging energy"].reset_index(drop=True) mean_all_night = [] for i in range(mean_per_hid_night.__len__()): mean_all_night.append(np.mean(mean_per_hid_night[i])) mean_night = pd.DataFrame() mean_night["Night charging"] = np.multiply(mean_all_night, 4) mean_night.plot() plt.xticks([0, 8, 16, 24, 32, 40, 48, 56, 64, 72, 80, 88] , ["00:00", "02:00", "04:00", "06:00", "08:00", "10:00", "12:00", "14:00", "16:00", "18:00", "20:00", "22:00"], rotation=45) plt.legend() plt.grid(alpha=0.2) plt.ylabel("Charging power in kW") price_lookahead = self.env_config["price_lookahead"] * int(self.env_config["include_price"]) bl_pv_lookahead = self.env_config["bl_pv_lookahead"] number_of_lookaheads = sum([int(self.env_config["include_pv"]), int(self.env_config["include_building"])]) # check observer module for building of observation list power_index = self.n_evs * 6 + 2 * (price_lookahead+1) + number_of_lookaheads * (bl_pv_lookahead+1) + 1 max_val = night_log.loc[0, "Observation"][power_index] plt.ylim([-max_val * 1.2, max_val * 1.2]) plt.show()