Source code for fleetrl.fleet_env.fleet_environment

import os
import json
import gymnasium as gym
import numpy as np
import pandas as pd
from typing import Literal
import datetime
import matplotlib.pyplot as plt
import matplotlib.patches as patches

from fleetrl.fleet_env.config.ev_config import EvConfig
from fleetrl.fleet_env.config.score_config import ScoreConfig
from fleetrl.fleet_env.config.time_config import TimeConfig

from fleetrl.fleet_env.episode import Episode

from fleetrl.utils.data_processing.data_processing import DataLoader
from fleetrl.utils.ev_charging.ev_charger import EvCharger
from fleetrl.utils.load_calculation.load_calculation import LoadCalculation, CompanyType

from fleetrl.utils.normalization.normalization import Normalization
from fleetrl.utils.normalization.oracle_normalization import OracleNormalization
from fleetrl.utils.normalization.unit_normalization import UnitNormalization

from fleetrl.utils.observation.observer_with_building_load import ObserverWithBuildingLoad
from fleetrl.utils.observation.observer_price_only import ObserverPriceOnly
from fleetrl.utils.observation.observer import Observer
from fleetrl.utils.observation.observer_with_pv import ObserverWithPV
from fleetrl.utils.observation.observer_bl_pv import ObserverWithBoth
from fleetrl.utils.observation.observer_soc_time_only import ObserverSocTimeOnly

from fleetrl.utils.time_picker.random_time_picker import RandomTimePicker
from fleetrl.utils.time_picker.static_time_picker import StaticTimePicker
from fleetrl.utils.time_picker.eval_time_picker import EvalTimePicker
from fleetrl.utils.time_picker.time_picker import TimePicker

from fleetrl.utils.battery_degradation.batt_deg import BatteryDegradation
from fleetrl.utils.battery_degradation.empirical_degradation import EmpiricalDegradation
from fleetrl.utils.battery_degradation.rainflow_sei_degradation import RainflowSeiDegradation
from fleetrl.utils.battery_degradation.log_data_deg import LogDataDeg

from fleetrl.utils.event_manager.event_manager import EventManager

from fleetrl.utils.data_logger.data_logger import DataLogger

from fleetrl.utils.schedule.schedule_generator import ScheduleGenerator, ScheduleType

from fleetrl.utils.rendering.render import ParkingLotRenderer

[docs] class FleetEnv(gym.Env): """ FleetRL: Reinforcement Learning environment for commercial vehicle fleets. Author: Enzo Alexander Cording - https://github.com/EnzoCording Master's thesis project, M.Sc. Sustainable Energy Engineering @ KTH Copyright (c) 2023, Enzo Cording This framework is built on the gymnasium core API and inherits from it. __init__, reset, and step are implemented, calling other modules and functions where needed. Base-derived class architecture is implemented, and the code is structured in a modular manner to enable improvements or changes in the model. Only publicly available data or own-generated data has been used in this implementation. The agent only sees information coming from the chargers: SOC, how long the vehicle is still plugged in, etc. However, this framework matches the number of chargers with the number of cars to reduce complexity. If more cars than chargers should be modelled, an allocation algorithm is necessary. What is more, battery degradation is modelled in this environment. In this case, the information of the car is required (instead of the charger). Modelling is facilitated by matching cars and chargers one-to-one. Therefore, throughout the code, "car" and "ev_charger" might be used interchangeably as indices. Note that this does not present a simplification from the agent perspective because the agent does only handles the SOC and time left at the charger, regardless of whether the vehicle is matching the charger one-to-one or not. """ def __init__(self, env_config: str | dict): """ :param env_config: String to specify path of json config file, or dict with config The following items are to be specified in the json or dict config: - data_path: String to specify the absolute path of the input folder - schedule_name: String to specify file name of schedule - building_name: String to specify building load data, includes pv as well - pv_name: String to optionally specify own pv dataset - include_building: Flag to include building or not - include_pv: Flag to include pv or not - include_price: Flag to include price or not - time_picker: Specify whether to pick time "static", "random" or "eval" - target_soc: Target SOC that needs to be fulfilled before leaving for next trip - max_batt_cap_in_all_use_cases: The largest battery size to be considered in the model - init_soh: Initial state of health of batteries. SOH=1 -> no degradation - deg_emp: Flag to use empirical degradation. Default False - ignore_price_reward: Flag to ignore price reward - ignore_overloading_penalty: Flag to ignore overloading penalty - ignore_invalid_penalty: Flag to ignore invalid action penalty - ignore_overcharging_penalty: Flag to ignore overcharging the battery penalty - episode_length: Length of episode in hours - log_data: Log SOC and SOH to csv files - calculate_degradation: Calculate degradation flag - verbose: Print statements - normalize_in_env: Conduct normalization in environment - use_case: String to specify the use-case - aux: Flag to include auxiliary information in the model - gen_schedule: Flag to generate schedule or not - gen_start_date: Start date of the schedule - gen_end_date: End date of the schedule - gen_name: File name of the schedule - gen_n_evs: How many EVs a schedule should be generated for - spot_markup: markup on the spot price: new_price = spot + X ct/kWh - spot_mul: Multiplied on the price: New price = (spot + markup) * (1+X) - feed_in_ded: Deduction of the feed-in tariff: new_feed_in = (1-X) * feed_in - seed: seed for random number generators - real_time Bool for specifying real time flag """ # call __init__() of parent class to ensure inheritance chain super().__init__() # Check that the input parameter config is passed properly - either as json or dict assert (env_config.__class__ == dict) or (env_config.__class__ == str), 'Invalid config type.' if env_config.__class__ == str: assert os.path.isfile(env_config), f'Config file not found at {env_config}.' self.env_config = self.read_config(conf_path=env_config) else: self.env_config = env_config # setting seed self.seed = self.env_config["seed"] np.random.seed(self.seed) # Loading configs self.time_conf = TimeConfig(self.env_config) self.ev_config = EvConfig(self.env_config) self.score_config = ScoreConfig(self.env_config) # Setting flags for the type of environment to build # NOTE: observations are appended to the db in the order specified here self.include_price = self.env_config["include_price"] self.include_building_load = self.env_config["include_building"] self.include_pv = self.env_config["include_pv"] self.aux_flag = self.env_config["aux"] # include auxiliary information # conduct normalization of observations self.normalize_in_env = self.env_config["normalize_in_env"] # Setting paths and file names # path for input files, needs to be the same for all inputs self.path_name = self.env_config["data_path"] # EV schedule database # generating own schedules or importing them self.generate_schedule = self.env_config["gen_schedule"] self.schedule_name = self.env_config["schedule_name"] self.gen_name = self.env_config["gen_name"] self.gen_start_date = self.env_config["gen_start_date"] self.gen_end_date = self.env_config["gen_end_date"] self.gen_n_evs = self.env_config["gen_n_evs"] # Price databases self.spot_name = self.env_config["price_name"] self.tariff_name = self.env_config["tariff_name"] # Building load database self.building_name = self.env_config["building_name"] # PV database is the same in this case if self.env_config["pv_name"] is not None: self.pv_name = self.env_config["pv_name"] else: self.pv_name = self.env_config["building_name"] use_case = self.env_config["use_case"] # Specify the company type and size of the battery self.company: CompanyType = None self.schedule_type: ScheduleType = None self.specify_company_and_battery_size(use_case) # Automatic schedule generation if specified if self.generate_schedule: self.auto_gen() # Make sure that data paths are correct and point to existing files self.check_data_paths(self.path_name, self.schedule_name, self.spot_name, self.building_name, self.pv_name) # Changing markups on spot prices if specified in config file (e.g. 20% on top on spot prices) self.change_markups() # scaling price conf with battery capacity. Each use-case has different battery sizes, so a full charge # would have different penalty ranges with different battery capacities. Normalized to max capacity (60 kWh) # if different use-cases are compared, change max_batt_cap to the highest battery capacity in kWh self.max_batt_cap_in_all_use_cases = self.env_config["max_batt_cap_in_all_use_cases"] self.score_config.price_multiplier = (self.score_config.price_multiplier * (self.max_batt_cap_in_all_use_cases / self.ev_config.init_battery_cap)) # Changing parameters, if specified self.time_conf.episode_length = self.env_config["episode_length"] self.ev_config.target_soc = self.env_config["target_soc"] # Changing ScoreConfig, if specified, e.g. setting some penalties to zero self.adjust_score_config() verbose = self.env_config["verbose"] # Set printing and logging parameters, false can increase training fps self.print_updates = verbose self.print_reward = verbose self.print_function = verbose self.calc_deg = self.env_config["calculate_degradation"] self.log_data = self.env_config["log_data"] # Event manager to check if a relevant event took place to pass to the agent self.event_manager: EventManager = EventManager() # Class simulating EV charging self.ev_charger: EvCharger = EvCharger(self.ev_config) # Choose time picker based on input string time_picker self.time_picker = self.choose_time_picker(self.env_config["time_picker"]) # Choose the right observer module based on the environment settings self.observer = self.choose_observer() # Instantiating episode object # Episode object contains all episode-specific information self.episode: Episode = Episode(self.time_conf) # Setting EV parameters self.eps = 0.005 # allowed SOC deviation from target: 0.5% self.initial_soh = self.env_config["init_soh"] # initial degree of battery degradation, assumed equal for all cars self.min_laxity: float = self.ev_config.min_laxity # How much excess time the car should at least have to charge # initiating variables inside __init__() that are needed for gym.Env self.info: dict = {} # Necessary for gym env (Double check because new implementation doesn't need it) # Loading the data logger for battery degradation self.deg_data_logger: LogDataDeg = LogDataDeg(self.episode) # Loading data logger for analysing results and everything else self.data_logger: DataLogger = DataLogger(self.time_conf.episode_length * self.time_conf.time_steps_per_hour) self.real_time = self.env_config["real_time"] # Loading the inputs self.data_loader: DataLoader = DataLoader(self.path_name, self.schedule_name, self.spot_name, self.tariff_name, self.building_name, self.pv_name, self.time_conf, self.ev_config, self.ev_config.target_soc, self.include_building_load, self.include_pv, self.real_time ) # get the total database self.db = self.data_loader.db if use_case == "ct": self.adjust_caretaker_lunch_soc() # first ID is 0 self.num_cars = self.db["ID"].max() + 1 # Target SoC - Vehicles should always leave with this SoC self.target_soc: np.ndarray = np.ones(self.num_cars) * self.ev_config.target_soc if self.env_config["include_building"]: max_load = max(self.db["load"]) else: max_load = 0 # building load not considered in that case # Instantiate load calculation with the necessary information """ Note: - Maximum building load is required to determine grid connection if value is not known. - Grid connection is sized at 1.1 times the maximum building load, or such that the charging - of 50% of EVs at full capacity causes a grid overloading. - This can be changed in the load calculation module, e.g. replacing it with a fixed value. """ self.load_calculation = LoadCalculation(env_config=self.env_config, company_type=self.company, num_cars=self.num_cars, max_load=max_load) # choosing degradation methodology: empirical linear or non-linear mathematical model if self.env_config["deg_emp"]: self.emp_deg: BatteryDegradation = EmpiricalDegradation(self.initial_soh, self.num_cars) else: self.sei_deg: BatteryDegradation = RainflowSeiDegradation(self.initial_soh, self.num_cars) # de-trend prices to make them usable as agent rewards if self.include_price: self.db = DataLoader.shape_price_reward(self.db, self.ev_config) """ - Normalizing observations (Oracle) or just concatenating (Unit) - Oracle is normalizing with the maximum values, that are assumed to be known - Unit doesn't normalize, but just concatenates, and parses data in the right format - Auxiliary flag is parsed, to include additional information or not - NB: If auxiliary data is changed, the observers, normalizers and dimensions have to be updated """ if self.normalize_in_env: self.normalizer: Normalization = OracleNormalization(self.db, self.include_building_load, self.include_pv, self.include_price, aux=self.aux_flag, ev_conf=self.ev_config, load_calc=self.load_calculation) else: self.normalizer: Normalization = UnitNormalization() # choose dimensions and bounds depending on settings low_obs, high_obs = self.detect_dim_and_bounds() self.observation_space = gym.spaces.Box( low=low_obs, high=high_obs, dtype=np.float32) # the action space is also continuous: -1 and 1 being the bounds (-100% to 100% of the EVSE kW power rating) self.action_space = gym.spaces.Box( low=-1, high=1, shape=(self.num_cars,), dtype=np.float32) self.render_mode = "human" self.pl_render: ParkingLotRenderer = ParkingLotRenderer()
[docs] def reset(self, **kwargs) -> tuple[np.array, dict]: """ :param kwargs: Necessary for gym inheritance :return: First observation (either normalized or not) and an info dict """ # reset degradation logs for new episode self.deg_data_logger.log = [] self.deg_data_logger.soc_log = [] # set done to False, since the episode just started self.episode.done = False # instantiate soh - depending on initial health settings self.episode.soh = np.ones(self.num_cars) * self.initial_soh # based on soh, instantiate battery capacity self.episode.battery_cap = self.episode.soh * self.ev_config.init_battery_cap # choose a start time based on the type of choice: same, random, deterministic self.episode.start_time = self.time_picker.choose_time(self.db, self.time_conf.freq, self.time_conf.end_cutoff) # calculate the finish time based on the episode length self.episode.finish_time = self.episode.start_time + np.timedelta64(self.time_conf.episode_length, 'h') # set the model time to the start time self.episode.time = self.episode.start_time # get observation from observer module obs = self.observer.get_obs(self.db, self.time_conf.price_lookahead, self.time_conf.bl_pv_lookahead, self.episode.time, ev_conf=self.ev_config, load_calc=self.load_calculation, aux=self.aux_flag, target_soc=self.target_soc) # get the first soc and hours_left observation self.episode.soc = obs["soc"] self.episode.hours_left = obs["hours_left"] if self.include_price: self.episode.price = obs["price"] self.episode.tariff = obs["tariff"] """ if time is insufficient due to unfavourable start date (for example loading an empty car with 15 min time left), soc is set in such a way that the agent always has a chance to fulfil the objective """ for car in range(self.num_cars): p_avail = min([self.ev_config.obc_max_power, self.load_calculation.evse_max_power]) time_needed = (self.target_soc[car] - self.episode.soc[car]) * self.episode.battery_cap[car] / p_avail # Gives some tolerance, check if hours_left > 0 because car has to be plugged in # Makes sure that enough laxity is present, in this case 50% is default if (self.episode.hours_left[car] > 0) and (self.ev_config.min_laxity * time_needed > self.episode.hours_left[car]): self.episode.soc[car] = (self.target_soc[car] - (time_needed * p_avail / self.episode.battery_cap[car]) / self.ev_config.min_laxity) if self.print_updates: print("Initial SOC modified due to unfavourable starting condition.") # soc for battery degradation self.episode.soc_deg = self.episode.soc.copy() # for battery degradation adjust to default soc, if soc is unknown in the beginning for car in range(self.num_cars): if self.episode.soc_deg[car] == 0: self.episode.soc_deg[car] = self.ev_config.def_soc # set the reward history back to an empty list, set cumulative reward to 0 self.episode.reward_history = [] self.episode.cumulative_reward = 0 self.episode.penalty_record = 0 # rebuild the observation vector with modified values obs["soc"] = self.episode.soc obs["hours_left"] = self.episode.hours_left if self.include_price: obs["price"] = self.episode.price obs["tariff"] = self.episode.tariff # Parse observation to normalization module norm_obs = self.normalizer.normalize_obs(obs) # Log first soc for battery degradation if self.calc_deg: self.deg_data_logger.log_soc(self.episode.soc_deg) if self.log_data and not self.episode.done: # obs action reward cashflow self.data_logger.log_data(self.episode.time, norm_obs, # normalized observation np.zeros(self.num_cars), # action 0.0, # reward 0.0, # cashflow 0.0, # penalties 0.0, # grid overloading 0.0, # soc missing on departure 0.0, # degradation np.zeros(self.num_cars), # log of charged energy in kWh self.episode.soh) # soh return norm_obs, self.info
[docs] def step(self, actions: np.array) -> tuple[np.array, float, bool, bool, dict]: """ The main logic of the EV charging problem is orchestrated in the step function. Input: Action on charging power for each EV Output: Next state, reward Intermediate processes: EV charging model, battery degradation, cost calculation, building load, penalties, etc. The step function runs as long as the done flag is False. Different functions and modules are called in this function to reduce the complexity and to distribute the tasks of the model. :param actions: Actions parsed by the agent, from -1 to 1, representing % of kW of the EVSE :return: Tuple containing next observation, reward, done, truncated and info dictionary """ self.episode.current_actions = actions while True: self.episode.time_conf.dt = self.get_next_dt() # get next dt in case time frequency changes self.episode.time_conf.time_steps_per_hour = int(1 / np.copy(self.episode.time_conf.dt)) self.episode.time_conf.minutes = self.get_next_minutes() # get next minutes in case time freq changes # define variables that are newly used every iteration cum_soc_missing = 0 # cumulative soc missing for each step there = self.db["There"][self.db["date"] == self.episode.time].values # plugged in y/n (before next time step) # parse the action to the charging function and receive the soc, next soc, reward and cashflow self.episode.soc, self.episode.next_soc, reward, cashflow, self.charge_log, self.episode.events = self.ev_charger.charge( self.db, self.num_cars, actions, self.episode, self.load_calculation, self.ev_config, self.time_conf, self.score_config, self.print_updates, self.target_soc) # set the soc to the next soc self.episode.old_soc = self.episode.soc.copy() self.episode.soc = self.episode.next_soc.copy() # save cashflow for print function self.episode.current_charging_expense = cashflow # calling the print function if self.print_function: self.print(actions) # check current load and pv for violation check if self.include_building_load: current_load = self.db.loc[self.db["date"] == self.episode.time, "load"].values[0] else: current_load = 0 if self.include_pv: current_pv = self.db.loc[self.db["date"] == self.episode.time, "pv"].values[0] else: current_pv = 0 # correct actions for spots where no car is plugged in corrected_actions = actions * there # check if connection has been overloaded and by how much overloaded_flag, overload_amount = self.load_calculation.check_violation(corrected_actions, self.db, current_load, current_pv) relative_loading = overload_amount / self.load_calculation.grid_connection + 1 # overload_penalty is calculated from a sigmoid function in score_conf if overloaded_flag: self.episode.events += 1 # relevant event detected overload_penalty = self.score_config.overloading_penalty(relative_loading) reward += overload_penalty self.episode.penalty_record += overload_penalty if self.print_updates: print(f"Grid connection of {self.load_calculation.grid_connection} kW has been overloaded:" f" {abs(overload_amount)} kW. Penalty: {round(overload_penalty, 3)}") # advance one time step self.episode.time += np.timedelta64(self.time_conf.minutes, 'm') # get the next observation entry from the dataset to get new arrivals or departures next_obs = self.observer.get_obs(self.db, self.time_conf.price_lookahead, self.time_conf.bl_pv_lookahead, self.episode.time, ev_conf=self.ev_config, load_calc=self.load_calculation, aux=self.aux_flag, target_soc=self.target_soc) next_obs_soc = next_obs["soc"] next_obs_time_left = next_obs["hours_left"] if self.include_price: next_obs_price = next_obs["price"] self.episode.price = next_obs_price next_obs_tariff = next_obs["tariff"] self.episode.tariff = next_obs_tariff # go through the stations and check whether the same car is still there, no car, or a new arrival for car in range(self.num_cars): # checks if a car just left and if rules were violated, e.g. didn't fully charge if (self.episode.hours_left[car] != 0) and (next_obs_time_left[car] == 0): self.episode.events += 1 # relevant event detected # caretaker is a special case because of the lunch break # it is not long enough to fully recharge, so a different target soc is applied if self.company == CompanyType.Caretaker: # lunch break case if (self.episode.time.hour > 11) and (self.episode.time.hour < 15): # check for soc violation if self.ev_config.target_soc_lunch - self.episode.soc[car] > self.eps: # penalty for not fulfilling charging requirement, square difference, scale and clip self.episode.events += 1 # relevant event detected soc_missing = self.ev_config.target_soc_lunch - self.episode.soc[car] cum_soc_missing += soc_missing #current_soc_pen = self.score_conf.penalty_soc_violation * soc_missing ** 2 #current_soc_pen = max(current_soc_pen, self.score_conf.clip_soc_violation) current_soc_pen = self.score_config.soc_violation_penalty(soc_missing) reward += current_soc_pen self.episode.penalty_record += current_soc_pen if self.print_updates: print(f"A car left the station without reaching the target SoC." f" Penalty: {round(current_soc_pen, 3)}") else: reward += self.score_config.fully_charged_reward # reward for fully charging the car # caretaker, other operation times, check for violation elif self.target_soc[car] - self.episode.soc[car] > self.eps: # current_soc_pen is calculated from a sigmoid function in score_conf self.episode.events += 1 # relevant event detected soc_missing = self.target_soc[car] - self.episode.soc[car] cum_soc_missing += soc_missing #current_soc_pen = self.score_conf.penalty_soc_violation * soc_missing ** 2 #current_soc_pen = max(current_soc_pen, self.score_conf.clip_soc_violation) current_soc_pen = self.score_config.soc_violation_penalty(soc_missing) reward += current_soc_pen self.episode.penalty_record += current_soc_pen if self.print_updates: print(f"A car left the station without reaching the target SoC." f" Penalty: {round(current_soc_pen, 3)}") else: reward += self.score_config.fully_charged_reward # reward for fully charging the car # other companies: if charging requirement wasn't met (with some tolerance eps) elif self.target_soc[car] - self.episode.soc[car] > self.eps: self.episode.events += 1 # relevant event detected # current_soc_pen is calculated from a sigmoid function in score_conf soc_missing = self.target_soc[car] - self.episode.soc[car] cum_soc_missing += soc_missing #current_soc_pen = self.score_conf.penalty_soc_violation * soc_missing ** 2 #current_soc_pen = max(current_soc_pen, self.score_conf.clip_soc_violation) current_soc_pen = self.score_config.soc_violation_penalty(soc_missing) reward += current_soc_pen self.episode.penalty_record += current_soc_pen if self.print_updates: print(f"A car left the station without reaching the target SoC." f" Penalty: {round(current_soc_pen, 3)}") else: reward += self.score_config.fully_charged_reward # reward for fully charging the car # still charging if (next_obs_time_left[car] != 0) and (self.episode.hours_left[car] != 0): self.episode.hours_left[car] -= self.time_conf.dt # no car in the next time step elif next_obs_time_left[car] == 0: self.episode.hours_left[car] = next_obs_time_left[car] self.episode.soc[car] = next_obs_soc[car] # new arrival in the next time step elif (self.episode.hours_left[car] == 0) and (next_obs_time_left[car] != 0): self.episode.events += 1 # relevant event self.episode.hours_left[car] = next_obs_time_left[car] self.episode.old_soc[car] = self.episode.soc[car] self.episode.soc[car] = next_obs_soc[car] # this shouldn't happen but if it does, an error is thrown else: raise TypeError("Observation format not recognized") # if battery degradation >= 10%, target SOC is increased to ensure sufficient kWh in the battery if self.episode.soh[car] <= 0.9: self.target_soc[car] = 0.9 self.episode.events += 1 # relevant event detected if self.print_updates and self.target_soc[car] != 0.9: print(f"Target SOC of Car {car} has been adjusted to 0.9 due to high battery degradation." f"Current SOH: {self.episode.soh[car]}") # Update SOH value for degradation calculations, wherever a car is plugged in for car in range(self.num_cars): if self.episode.hours_left[car] != 0: self.episode.soc_deg[car] = self.episode.soc[car] # if the finish time is reached, set done to True # The RL_agents agent then resets the environment if self.episode.time == self.episode.finish_time: self.episode.done = True self.episode.events += 1 # relevant event detected if self.calc_deg: self.deg_data_logger.add_log_entry() if self.print_updates: print(f"Episode done: {self.episode.done}") self.logged_data = self.data_logger.log # append to the reward history self.episode.cumulative_reward += reward self.episode.reward_history.append((self.episode.time, self.episode.cumulative_reward)) if self.print_reward: print(f"Reward signal: {round(reward, 3)}") print("---------") print("\n") next_obs["soc"] = self.episode.soc next_obs["hours_left"] = self.episode.hours_left if self.include_price: next_obs["price"] = self.episode.price next_obs["tariff"] = self.episode.tariff # normalize next observation norm_next_obs = self.normalizer.normalize_obs(next_obs) # Log soc for battery degradation if self.calc_deg: self.deg_data_logger.log_soc(self.episode.soc_deg) # for logging: calculate penalty amount, grid overloading in kW and percentage points of SOC violated penalty = reward - (cashflow * self.score_config.price_multiplier) grid = abs(overload_amount) soc_v = abs(cum_soc_missing) # Calculate degradation and state of health based on chosen method # calculate degradation once per day if self.calc_deg and ((self.episode.time.hour == 14) and (self.episode.time.minute == 45)): degradation = self.sei_deg.calculate_degradation(self.deg_data_logger.soc_log, self.load_calculation.evse_max_power, self.time_conf, self.ev_config.temperature) # calculate SOH from current degradation self.episode.soh = np.subtract(self.episode.soh, degradation) # calculate new resulting battery capacity after degradation self.episode.battery_cap = self.episode.soh * self.ev_config.init_battery_cap # otherwise set degradation to 0 for logging purposes else: degradation = 0.0 # log data if episode is not done, otherwise first observation of next episode would be returned if self.log_data and not self.episode.done: self.data_logger.log_data(self.episode.time, norm_next_obs, actions, reward, cashflow, penalty, grid, soc_v, degradation, self.charge_log, self.episode.soh) if not self.real_time: break if self.event_manager.check_event(self.episode): if self.print_updates: print("Relevant event recognised. Will pass to RL agent.") self.episode.events = 0 break # return according to openAI gym core API return norm_next_obs, reward, self.episode.done, False, self.info
[docs] def close(self): return None
[docs] def print(self, action): """ The print function can provide useful information of the environment dynamics and the agent's actions. Can slow down FPS due to the printing at each timestep :param action: Action of the agent :return: None -> Just prints information if specified """ print(f"Timestep: {self.episode.time}") if self.include_price: print(f"Total price with fees: {np.round(self.episode.price[0] / 1000, 3)} €/kWh") current_spot = self.db.loc[self.db["date"]==self.episode.time, "DELU"].values[0] print(f"Spot: {np.round(current_spot/1000, 3)} €/kWh") print(f"Tariff: {self.episode.tariff[0] / 1000} €/kWh") print(f"SOC: {np.round(self.episode.soc, 3)}, Time left: {self.episode.hours_left} hours") print(f"Action taken: {np.round(action, 3)}") print(f"Actual charging energy: {round(self.episode.total_charging_energy, 3)} kWh") print(f"Logging energy: {round(self.charge_log.sum(), 3)} kWh") print(f"Charging cost/revenue: {round(self.episode.current_charging_expense, 3)} €") print(f"SoH: {np.round(self.episode.soh, 3)}") print("--------------------------")
[docs] def render(self): if self.render_mode == "human": there = self.db["There"][self.db["date"] == self.episode.time].values kw = np.multiply(self.episode.current_actions, self.load_calculation.evse_max_power) soc = self.episode.soc if there is None: there = np.zeros(self.num_cars) kw = np.zeros(self.num_cars) soc = np.zeros(self.num_cars) self.pl_render.render(there=there, kw = kw, soc = soc)
# functions that can be called through vec_envs via env_method()
[docs] def get_log(self): """ This function can be called through SB3 vectorized environments via VecEnv.env_method("get_log")[0] The zero index is required so only the first element -> the DataFrame is extracted :return: Log dataframe """ return self.data_logger.log
[docs] def is_done(self): """ VecEnv.env_method("is_done")[0] :return: Flag is episode is done, bool """ # return if episode is done return self.episode.done
[docs] def get_start_time(self): """ VecEnv.env_method("get_start_time")[0] :return: pd.TimeStamp """ return self.episode.start_time
[docs] def set_start_time(self, start_time: str): """ VecEnv.env_method("set_start_time", [f"{start_time}"]) Must parse the function and argument of start_time :param start_time: string of pd.TimeStamp / date :return: None """ self.episode.start_time = start_time return None
[docs] def get_time(self): """ VecEnv.env_method("get_time")[0] :return: pd.TimeStamp: current timestamp """ return self.episode.time
[docs] def get_dist_factor(self): """ This function returns the distribution/laxity factor: how much time needed vs. how much time left at charger If factor is 0.1, the dist agent would only charge with 10% of the EVSE capacity. Call via env_method("get_dist_factor")[0] if using an SB3 Vectorized Environment :return: dist/laxity factor, float """ obs = self.observer.get_obs(self.db, self.time_conf.price_lookahead, self.time_conf.bl_pv_lookahead, self.episode.time, ev_conf=self.ev_config, load_calc=self.load_calculation, aux=self.aux_flag, target_soc=self.target_soc) return np.divide(obs["hours_needed"], np.add(obs["hours_left"], 0.001))
[docs] def choose_time_picker(self, time_picker): """ Chooses the right time picker based on the specified in input string. Static: Always the same time is picked to start an episode Random: Start an episode randomly from the training set Eval: Start an episode randomly from the validation set :param time_picker: (string), specifies which time picker to choose: "static", "eval", "random" :return: tp (TimePicker) -> time picker object """ # Load time picker module if time_picker == "static": # when an episode starts, this class picks the same starting time tp: TimePicker = StaticTimePicker() elif time_picker == "eval": # picks a random starting times from test set (nov - dez) tp: TimePicker = EvalTimePicker(self.time_conf.episode_length) elif time_picker == "random": # picks random starting times from training set (jan - oct) tp: TimePicker = RandomTimePicker() else: # must choose between static, eval or random raise TypeError("Time picker type not recognised") return tp
[docs] def choose_observer(self): """ This function chooses the right observer, depending on whether to include price, building, PV, etc. :return: obs (Observer) -> The observer module to choose """ # All observations are made in the observer class # not even price: only soc and time left if not self.include_price: obs: Observer = ObserverSocTimeOnly() # only price elif not self.include_building_load and not self.include_pv: obs: Observer = ObserverPriceOnly() # price and building load elif self.include_building_load and not self.include_pv: obs: Observer = ObserverWithBuildingLoad() # price and pv elif not self.include_building_load and self.include_pv: obs: Observer = ObserverWithPV() # price, building load and pv elif self.include_building_load and self.include_pv: obs: Observer = ObserverWithBoth() else: raise TypeError("Observer configuration not found. Recheck flags.") return obs
[docs] def detect_dim_and_bounds(self): """ This function chooses the right dimension of the observation space based on the chosen configuration. Each increase of dim is explained below. The low_obs and high_obs are built in the normalizer object, using the dim value that was calculated in this function. - set boundaries of the observation space, detects if normalized or not. - If aux flag is true, additional information enlarges the observation space. - The following code goes through all possible environment setups. - Depending on the setup, the dimensions differ and every case is handled differently. :return: low_obs and high_obs: tuple[float, float] | tuple[np.ndarray, np.ndarray] -> used for gym.Spaces """ if not self.include_price: dim = 2 * self.num_cars # soc and time left for each EV if self.aux_flag: dim += self.num_cars # there dim += self.num_cars # target soc dim += self.num_cars # charging left dim += self.num_cars # hours needed dim += self.num_cars # laxity dim += 1 # evse power dim += 6 # month, week, hour sin/cos low_obs, high_obs = self.normalizer.make_boundaries(dim) elif not self.include_building_load and not self.include_pv: dim = 2 * self.num_cars + (self.time_conf.price_lookahead + 1) * 2 if self.aux_flag: dim += self.num_cars # there dim += self.num_cars # target soc dim += self.num_cars # charging left dim += self.num_cars # hours needed dim += self.num_cars # laxity dim += 1 # evse power dim += 6 # month, week, hour sin/cos low_obs, high_obs = self.normalizer.make_boundaries(dim) elif self.include_building_load and not self.include_pv: dim = (2 * self.num_cars + (self.time_conf.price_lookahead + 1) * 2 + self.time_conf.bl_pv_lookahead + 1 ) if self.aux_flag: dim += self.num_cars # there dim += self.num_cars # target soc dim += self.num_cars # charging left dim += self.num_cars # hours needed dim += self.num_cars # laxity dim += 1 # evse power dim += 1 # grid cap dim += 1 # avail grid cap for charging dim += 1 # possible avg action per car dim += 6 # month, week, hour sin/co low_obs, high_obs = self.normalizer.make_boundaries(dim) elif not self.include_building_load and self.include_pv: dim = (2 * self.num_cars + (self.time_conf.price_lookahead + 1) * 2 + self.time_conf.bl_pv_lookahead + 1 ) if self.aux_flag: dim += self.num_cars # there dim += self.num_cars # target soc dim += self.num_cars # charging left dim += self.num_cars # hours needed dim += self.num_cars # laxity dim += 1 # evse power dim += 6 # month, week, hour sin/cos low_obs, high_obs = self.normalizer.make_boundaries(dim) elif self.include_building_load and self.include_pv: dim = (2 * self.num_cars # soc and time left + (self.time_conf.price_lookahead + 1) * 2 # price and tariff + 2 * (self.time_conf.bl_pv_lookahead + 1) # pv and building load ) if self.aux_flag: dim += self.num_cars # there dim += self.num_cars # target soc dim += self.num_cars # charging left dim += self.num_cars # hours needed dim += self.num_cars # laxity dim += 1 # evse power dim += 1 # grid cap dim += 1 # avail grid cap for charging dim += 1 # possible avg action per car dim += 6 # month, week, hour sin/cos low_obs, high_obs = self.normalizer.make_boundaries(dim) else: low_obs = None high_obs = None raise ValueError("Problem with environment setup. Check building and pv flags.") return low_obs, high_obs
[docs] def adjust_caretaker_lunch_soc(self): """ The caretaker target SOC can be set lower during the lunch break to avoid unfair penalties occurring. This is because the break is not long enough to charge until 0.85 target SOC. :return: None -> sets the target SOC during lunch break hours to 0.65 by default """ # make an adjustment for caretakers: the afternoon tour SOC on arrival should be calculated with the # afternoon target SOC. This is set to 0.65 in this case afternoon_trips = self.db.loc[((self.db["date"].dt.hour >= 0) & (self.db["date"].dt.hour <= 10)) | ((self.db["date"].dt.hour >= 15) & (self.db["date"].dt.hour <= 23))] self.db.loc[((self.db["date"].dt.hour >= 0) & (self.db["date"].dt.hour <= 10)) | ((self.db["date"].dt.hour >= 15) & (self.db["date"].dt.hour <= 23)), "SOC_on_return"] \ = (self.ev_config.target_soc_lunch - afternoon_trips["last_trip_total_consumption"].div(self.ev_config.init_battery_cap)) self.db.loc[self.db["There"] == 0, "SOC_on_return"] = 0
[docs] def auto_gen(self): """ This function automatically generates schedules as specified. Uses the ScheduleGenerator module. Note: this can take up to 1-3 hours, depending on the number of vehicles. :return: None -> The schedule is generated and placed in the input folder """ gen_sched = [] print("Generating schedules... This may take a while.") for i in range(self.gen_n_evs): self.schedule_gen = ScheduleGenerator(env_config=self.env_config, schedule_type=self.schedule_type, vehicle_id=str(i)) gen_sched.append(self.schedule_gen.generate_schedule()) complete_schedule = pd.concat(gen_sched) if not self.gen_name.endswith(".csv"): self.gen_name = self.gen_name + ".csv" complete_schedule.to_csv(os.path.join(self.path_name, self.gen_name)) print(f"Schedule generation complete. Saved in Inputs path. File name: {self.gen_name}") self.schedule_name = self.gen_name
[docs] def get_next_dt(self): """ Calculates the time delta from the current time step and the next one. This allows for csv input files that have irregular time intervals. Energy calculations will automatically adjust for the dynamic time differences through kWh = kW * dt :return: next time delta in hours """ current_time = self.episode.time next_time = self.db["date"][self.db["date"].searchsorted(current_time) + 1] delta = (next_time - current_time).total_seconds()/3600 return delta
[docs] def get_next_minutes(self): """ Calculates the integer of minutes until the next time step. This therefore limits the framework's current maximum resolution to discrete time steps of 1 min. This will be improved soon, as well as the dependency to know the future value beforehand. :return: Integer of minutes until next timestep """ current_time = self.episode.time next_time = self.db["date"][self.db["date"].searchsorted(current_time) + 1] delta = (next_time - current_time).total_seconds()/60 return int(delta)
[docs] def read_config(self, conf_path: str): with open(f'{conf_path}', 'r') as file: env_config = json.load(file) return env_config
[docs] def check_data_paths(self, input_path, schedule_path, spot_path, load_path, pv_path): schedule = os.path.join(input_path, schedule_path) if schedule_path is not None else None spot = os.path.join(input_path, spot_path) if spot_path is not None else None load = os.path.join(input_path, load_path) if load_path is not None else None pv = os.path.join(input_path, pv_path) if pv_path is not None else None for path in [schedule, spot, load, pv]: if path is not None: assert(os.path.isfile(path)), f"Path does not exist: {path}"
[docs] def specify_company_and_battery_size(self, use_case): # Specify company type and associated battery size in kWh if use_case == "ct": self.company = CompanyType.Caretaker self.schedule_type = ScheduleType.Caretaker self.ev_config.init_battery_cap = 16.7 elif use_case == "ut": self.company = CompanyType.Utility self.schedule_type = ScheduleType.Utility self.ev_config.init_battery_cap = 50.0 elif use_case == "lmd": self.company = CompanyType.Delivery self.schedule_type = ScheduleType.Delivery self.ev_config.init_battery_cap = 60.0 elif use_case == "custom": self.company = CompanyType.Custom self.schedule_type = ScheduleType.Custom self.ev_config.init_battery_cap = self.env_config["custom_ev_battery_size_in_kwh"] else: raise TypeError("Company not recognised.")
[docs] def change_markups(self): if self.env_config["spot_markup"] is not None: self.ev_config.fixed_markup = self.env_config["spot_markup"] if self.env_config["spot_mul"] is not None: self.ev_config.variable_multiplier = self.env_config["spot_mul"] if self.env_config["feed_in_ded"] is not None: self.ev_config.feed_in_deduction = self.env_config["feed_in_ded"]
[docs] def adjust_score_config(self): if self.env_config["ignore_price_reward"]: self.score_config.price_multiplier = 0 if self.env_config["ignore_overloading_penalty"]: self.score_config.penalty_overloading = 0 if self.env_config["ignore_invalid_penalty"]: self.score_config.penalty_invalid_action = 0 if self.env_config["ignore_overcharging_penalty"]: self.score_config.penalty_overcharging = 0