Source code for fleetrl.agent_eval.basic_evaluation

import os.path
import json

from fleetrl.fleet_env.fleet_environment import FleetEnv
from fleetrl.benchmarking.benchmark import Benchmark

from stable_baselines3.common.vec_env import SubprocVecEnv, VecNormalize
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import datetime as dt
import matplotlib.dates as mdates

import seaborn as sns
import matplotlib
import plotly.graph_objects as go
from plotly.subplots import make_subplots

matplotlib.rcParams.update({'font.size': 16})

from fleetrl.agent_eval.evaluation import Evaluation


[docs] class BasicEvaluation(Evaluation): def __init__(self, n_steps: int, n_evs: int, n_envs: int = 1, n_episodes: int = 1): self.n_steps = n_steps self.n_evs = n_evs self.n_envs = n_envs self.n_episodes = n_episodes self.env_kwargs = None @staticmethod def _change_param(env_kwargs: dict, key: str, val): if env_kwargs["env_config"].__class__ == dict: # env_config is a dict which we can modify directly env_kwargs["env_config"][key] = val elif env_kwargs["env_config"].__class__ == str: # env_config is a file path to a json file conf_path = env_kwargs["env_config"] # check that file exists and open json assert os.path.isfile(conf_path), "Config file path not found" with open(f'{conf_path}', 'r') as file: env_config = json.load(file) # write episode length parm into dict env_config[key] = val # replace path string with dict to use modified env_config from now on env_kwargs["env_config"] = env_config else: raise TypeError("Config parameter not specified correctly. Either dict or valid path.") return env_kwargs
[docs] def evaluate_agent(self, env_kwargs: dict, norm_stats_path: str, model_path: str, seed: int = None): env_kwargs = self._change_param(env_kwargs=env_kwargs, key="episode_length", val=self.n_steps) self.env_kwargs = env_kwargs eval_vec_env = make_vec_env(FleetEnv, n_envs=self.n_envs, vec_env_cls=SubprocVecEnv, seed=seed, env_kwargs=env_kwargs) eval_norm_vec_env = VecNormalize(venv=eval_vec_env, norm_obs=True, norm_reward=True, training=True, clip_reward=10.0) eval_norm_vec_env.load(load_path=norm_stats_path, venv=eval_norm_vec_env) model = PPO.load(model_path, env=eval_norm_vec_env, custom_objects={"observation_space": eval_norm_vec_env.observation_space, "action_space": eval_norm_vec_env.action_space}) mean_reward, _ = evaluate_policy(model, eval_norm_vec_env, n_eval_episodes=self.n_episodes, deterministic=True) print(mean_reward) log_RL = model.env.env_method("get_log")[0] log_RL.reset_index(drop=True, inplace=True) log_RL = log_RL.iloc[0:-2] return log_RL
[docs] def compare(self, rl_log: pd.DataFrame, benchmark_log: pd.DataFrame): rl_cashflow = rl_log["Cashflow"].sum() rl_reward = rl_log["Reward"].sum() rl_deg = rl_log["Degradation"].sum() rl_overloading = rl_log["Grid overloading"].sum() rl_soc_violation = rl_log["SOC violation"].sum() rl_n_violations = rl_log[rl_log["SOC violation"] > 0]["SOC violation"].size rl_soh = rl_log["SOH"].iloc[-1] benchmark_cashflow = benchmark_log["Cashflow"].sum() benchmark_reward = benchmark_log["Reward"].sum() benchmark_deg = benchmark_log["Degradation"].sum() benchmark_overloading = benchmark_log["Grid overloading"].sum() benchmark_soc_violation = benchmark_log["SOC violation"].sum() benchmark_n_violations = benchmark_log[benchmark_log["SOC violation"] > 0]["SOC violation"].size benchmark_soh = benchmark_log["SOH"].iloc[-1] print(f"RL reward: {rl_reward}") print(f"DC reward: {benchmark_reward}") print(f"RL cashflow: {rl_cashflow}") print(f"DC cashflow: {benchmark_cashflow}") total_results = pd.DataFrame() total_results["Category"] = ["Reward", "Cashflow", "Average degradation per EV", "Overloading", "SOC violation", "# Violations", "SOH"] total_results["RL-based charging"] = [rl_reward, rl_cashflow, np.round(np.mean(rl_deg), 5), rl_overloading, rl_soc_violation, rl_n_violations, np.round(np.mean(rl_soh), 5)] total_results["benchmark charging"] = [benchmark_reward, benchmark_cashflow, np.round(np.mean(benchmark_deg), 5), benchmark_overloading, benchmark_soc_violation, benchmark_n_violations, np.round(np.mean(benchmark_soh), 5)] print(total_results) rl_log["hour_id"] = (rl_log["Time"].dt.hour + rl_log["Time"].dt.minute / 60) benchmark_log["hour_id"] = (benchmark_log["Time"].dt.hour + benchmark_log["Time"].dt.minute / 60) mean_per_hid_rl = rl_log.groupby("hour_id").mean()["Charging energy"].reset_index(drop=True) mean_all_rl = [] for i in range(mean_per_hid_rl.__len__()): mean_all_rl.append(np.mean(mean_per_hid_rl[i])) mean_per_hid_benchmark = benchmark_log.groupby("hour_id").mean()["Charging energy"].reset_index(drop=True) mean_all_benchmark = [] for i in range(mean_per_hid_benchmark.__len__()): mean_all_benchmark.append(np.mean(mean_per_hid_benchmark[i])) mean_both = pd.DataFrame() mean_both["RL"] = np.multiply(mean_all_rl, 4) mean_both["benchmark charging"] = np.multiply(mean_all_benchmark, 4) mean_both.plot() plt.xticks([0, 8, 16, 24, 32, 40, 48, 56, 64, 72, 80, 88] , ["00:00", "02:00", "04:00", "06:00", "08:00", "10:00", "12:00", "14:00", "16:00", "18:00", "20:00", "22:00"], rotation=45) plt.legend() plt.grid(alpha=0.2) plt.ylabel("Charging power in kW") max = rl_log.loc[0, "Observation"][-10] plt.ylim([-max * 1.2, max * 1.2]) plt.show()
[docs] def plot_soh(self, rl_log, benchmark_log): # Create a date range from Jan to Dec # Create a date range from Jan to Dec with a 15-minute resolution date_range = pd.date_range(start=rl_log["Time"].iloc[0], end=rl_log["Time"].iloc[-1], freq='15min') # Create a figure fig, ax = plt.subplots() # Rescale the index of the dataframes to match the date range rescaled_rl_log = rl_log.copy() rescaled_rl_log.index = date_range[:len(rl_log)] rescaled_benchmark_log = benchmark_log.copy() rescaled_benchmark_log.index = date_range[:len(benchmark_log)] # Plot the data ax.plot(rescaled_benchmark_log.index, rescaled_benchmark_log['SOH'].apply(lambda x: x[0]), label='Dumb', color='red') ax.plot(rescaled_rl_log.index, rescaled_rl_log['SOH'].apply(lambda x: x[0]), label='RL', color='blue') # Set the title and labels ax.set_title('State of Health Over Time') ax.set_xlabel('Time') ax.set_ylabel('State of Health') ax.legend() ax.xaxis.set_major_locator(mdates.MonthLocator()) ax.xaxis.set_major_formatter(mdates.DateFormatter('%b')) ax.set_xticklabels(ax.get_xticklabels()[0:12])[0:12] ax.set_xticks(ax.get_xticks()[0:12])[0:12] # Show the plot plt.grid(alpha=0.2) plt.show()
[docs] def plot_violations(self, rl_log, benchmark_log): if len(rl_log[rl_log["SOC violation"] > 0]) == 0: print("No violations found.") return None fig, axs = plt.subplots(1, 1, figsize=(3, 3)) # Plot RL data rl_log.loc[rl_log["SOC violation"] > 0, "SOC violation"].sort_values(ascending=False).reset_index( drop=True).plot.bar(color="blue", ax=axs) axs.set_title("RL-based charging") axs.set_xticks([0, 25, 50, 75, 100]) axs.set_ylim([0, 0.4]) axs.grid(alpha=0.2) axs.set_xlabel("Violations") axs.set_ylabel("Missing SOC per violation") plt.tight_layout() plt.show()
[docs] def plot_action_dist(self, rl_log, benchmark_log): if rl_log['Action'][0].__class__ == np.ndarray: rl_log['Action'] = rl_log['Action'].apply(lambda x: x[0]) benchmark_log['Action'] = benchmark_log['Action'].apply(lambda x: x[0]) # Create a figure with two subplots side by side fig, axs = plt.subplots(1, 2, figsize=(8, 3)) # Plot the distribution of actions for the RL-based strategy on the first subplot axs[0].hist(rl_log['Action'], bins=50, color='blue', edgecolor='black') axs[0].set_title('Distribution of Actions for RL-based Charging') axs[0].set_xlabel('Action') axs[0].set_ylabel('Frequency') # Plot the distribution of actions for the benchmark strategy on the second subplot axs[1].hist(benchmark_log['Action'], bins=50, color='red', edgecolor='black') axs[1].set_title('Distribution of actions for benchmark Charging') axs[1].set_xlabel('Action') axs[1].set_ylabel('Frequency') plt.tight_layout() plt.show()
[docs] def plot_detailed_actions(self, start_date: str | pd.Timestamp, end_date: str | pd.Timestamp, rl_log: pd.DataFrame=None, uc_log: pd.DataFrame=None, dist_log: pd.DataFrame=None, night_log: pd.DataFrame=None, lp_log: pd.DataFrame=None): evse_power = self.env_kwargs["env_config"]["custom_ev_charger_power_in_kw"] assert any(df is not None for df in [rl_log, uc_log, dist_log, night_log]), "No log provided." chosen_dfs = [] log_names = [] if rl_log is not None: rl_log = rl_log[(rl_log["Time"] >= start_date) & (rl_log["Time"] <= end_date)] chosen_dfs.append(self._get_from_obs(rl_log)) log_names.append("RL-based charging") if uc_log is not None: uc_log = uc_log[(uc_log["Time"] >= start_date) & (uc_log["Time"] <= end_date)] chosen_dfs.append(self._get_from_obs(uc_log)) log_names.append("Uncontrolled charging") if dist_log is not None: dist_log = dist_log[(dist_log["Time"] >= start_date) & (dist_log["Time"] <= end_date)] chosen_dfs.append(self._get_from_obs(dist_log)) log_names.append("Distributed charging") if night_log is not None: night_log = night_log[(night_log["Time"] >= start_date) & (night_log["Time"] <= end_date)] chosen_dfs.append(self._get_from_obs(log=night_log)) log_names.append("Night charging") if lp_log is not None: lp_log = lp_log[(lp_log["Time"] >= start_date) & (lp_log["Time"] <= end_date)] chosen_dfs.append(self._get_from_obs(log=lp_log)) log_names.append("Linear optimization") # Create a subplot with 3 rows and 1 column, without sharing the x-axis fig = make_subplots(rows=len(chosen_dfs)+1, cols=1, shared_xaxes=False, vertical_spacing=0.1, specs=[[{'secondary_y': True}] for _ in range(len(chosen_dfs)+1)], subplot_titles=(["Load, PV and Price", *[f"{log_names[i]} - Money spent: €" + str(np.round(chosen_dfs[i]["CF"].sum() * -1, 1)) for i in range(len(chosen_dfs))]]), column_widths=[1000], row_heights=[270 for _ in range(len(chosen_dfs)+1)]) # Add traces for the first subplot fig.add_trace(go.Scatter(x=chosen_dfs[0]["Date"], y=chosen_dfs[0]["Load"], name='Building Load', legendgroup="1"), row=1, col=1) fig.add_trace(go.Scatter(x=chosen_dfs[0]["Date"], y=chosen_dfs[0]["PV"], name='PV', legendgroup="1"), row=1, col=1) fig.add_trace(go.Scatter(x=chosen_dfs[0]["Date"], y=chosen_dfs[0]["Price"], name='Price', legendgroup="1", legendgrouptitle=dict(text="Use-case info")), row=1, col=1, secondary_y=True) # # Add traces for the second subplot (you can change these as needed) # fig.add_trace(go.Scatter(x=df["Date"], y=df_real["Action"], name='Charging power', legendgroup="2"), row=2, col=1) # fig.add_trace(go.Scatter(x=df["Date"], y=df_real["SOC"], name='SOC', legendgroup="2"), row=2, col=1, secondary_y=True) for i in range(len(chosen_dfs)): # Add traces for the second subplot (you can change these as needed) fig.add_trace(go.Scatter(x=chosen_dfs[i]["Date"], y=chosen_dfs[i]["Action"], name='Charging power', legendgroup=f"{i+2}", legendgrouptitle=dict(text=log_names[i])), row=i+2, col=1) fig.add_trace(go.Scatter(x=chosen_dfs[i]["Date"], y=chosen_dfs[i]["SOC"], name='SOC', legendgroup=f"{i+2}"), row=i+2, col=1, secondary_y=True) fig.update_xaxes(row=1, range=[start_date, end_date], matches="x") fig.update_xaxes(row=2, range=[start_date, end_date], matches="x") for i in range(len(chosen_dfs)+1): fig.update_xaxes(row=i + 1, col=1, matches='x') for i in range(1, len(chosen_dfs)+1): fig.update_yaxes( tickvals=[-evse_power, 0, evse_power], # Values at which ticks on this axis appear ticktext=[f'-{evse_power}', '0', f'{evse_power}'], # Text that appears at the ticks row=i + 1, col=1, # Row and column of the subplot to update (adjust as needed) secondary_y=False # Set to True if updating the secondary y-axis ) for i in range(1, len(chosen_dfs)+1): fig.update_yaxes( tickvals=[0, 0.8], # Values at which ticks on this axis appear ticktext=['0', '0.8'], # Text that appears at the ticks row=i + 1, col=1, # Row and column of the subplot to update (adjust as needed) secondary_y=True # Set to True if updating the secondary y-axis ) min_price = chosen_dfs[0]["Price"].min() max_price = chosen_dfs[0]["Price"].max() for i in range(1): fig.update_yaxes( tickvals = [min_price, max_price], # Values at which ticks on this axis appear ticktext = [f'{np.round(min_price, 0)}', f'{np.round(max_price, 0)}'], # Text that appears at the ticks row = i + 1, col=1, # Row and column of the subplot to update (adjust as needed) secondary_y = True # Set to True if updating the secondary y-axis ) fig.update_yaxes(row=1, col=1, range=[min_price-2, max_price+2], secondary_y=True) for i in range(1, len(chosen_dfs)+1): fig.update_yaxes(row=i+1, col=1, range=[-evse_power*1.1, evse_power*1.1], secondary_y=False) # Labels for primary y-axes primary_labels = ["kW" for _ in range(len(chosen_dfs)+1)] # Labels for secondary y-axes secondary_labels = ["ct/kWh", *["SOC" for _ in range(len(chosen_dfs))]] # Update primary y-axes labels for i, label in enumerate(primary_labels): fig.update_yaxes(title_text=label, row=i + 1, col=1, secondary_y=False) # Update secondary y-axes labels for i, label in enumerate(secondary_labels): fig.update_yaxes(title_text=label, row=i + 1, col=1, secondary_y=True) fig.update_layout( # width=1050, # height=1400, margin=dict(l=35, r=45, t=25, b=25), font=dict(size=16) ) fig.update_layout( legend_tracegroupgap=20, ) # Update the x-axis to show the date without the year fig.update_xaxes( tickformat="%m-%d", # Display only month and day ) # Other plot updates # ... fig.update_xaxes( tickformat="%m/%d %H:%M" # Display abbreviated month name, day, hours, and minutes ) # return the plot return fig
def _get_from_obs(self, log: dict): obs = log["Observation"] act = log["Charging energy"] cf = log["Cashflow"] env_config = self.env_kwargs["env_config"] bl_pv_lookahead = env_config["bl_pv_lookahead"] pr_lookahead = env_config["price_lookahead"] length = len(log) # Check observer class to see how observation list is built up date = log["Time"] first = 0 # first entry has index 0 last = self.n_evs - 1 # soc for each car if self.n_evs > 1: soc = [obs[i][first:last].mean() for i in range(length)] else: soc = [obs[i][first] for i in range(length)] first = self.n_evs last = self.n_evs * 2 - 1 # hours left at charger for each car if self.n_evs > 1: hours_left = [obs[i][first:last].mean() for i in range(length)] else: hours_left = [obs[i][first] for i in range(length)] first = self.n_evs * 2 last = self.n_evs * 2 + pr_lookahead # price lookahead gives price in hour, hour+1, etc. price = [obs[i][first] for i in range(length)] first = self.n_evs * 2 + pr_lookahead + 1 last = self.n_evs * 2 + pr_lookahead * 2 + 1 # tariff paid when discharging, with lookahead tariff = [obs[i][first] for i in range(length)] first = self.n_evs * 2 + pr_lookahead * 2 + 2 last = self.n_evs * 2 + pr_lookahead * 2 + 2 + bl_pv_lookahead # building load lookahead building_load = [obs[i][first] for i in range(length)] first = self.n_evs * 2 + pr_lookahead * 2 + 2 + bl_pv_lookahead + 1 last = self.n_evs * 2 + pr_lookahead * 2 + bl_pv_lookahead * 2 + 1 # pv has same lookahead as building pv = [obs[i][first] for i in range(length)] free_cap = [obs[i][-8] / obs[i][-9] for i in range(length)] # free grid capacity in MW / total grid capacity time_steps_per_hour = env_config["time_steps_per_hour"] # act is charging energy in kWh, we want to display the currently drawn power in kW first = 0 # first entry has index 0 last = self.n_evs - 1 # soc for each car if self.n_evs > 1: action = [act[i][first:last].sum() * time_steps_per_hour for i in range(length)] # Going from kWh to kW else: action = [act[i][first] * time_steps_per_hour for i in range(length)] # Going from kWh to kW df = pd.DataFrame({ 'Date': date, 'SOC': soc, 'Load': building_load, 'PV': pv, 'Price': price, 'Action': action, 'Free cap': free_cap, 'CF': cf }) return df