Source code for fleetrl.utils.normalization.oracle_normalization

import numpy as np
import pandas as pd

from fleetrl.utils.normalization.unit_normalization import Normalization
from fleetrl.utils.load_calculation.load_calculation import LoadCalculation
from fleetrl.fleet_env.config.ev_config import EvConfig

# This normalizes based on the global maximum values. These could be in the future, hence the oracle prefix.
# For more realistic approaches, a rolling average could be used, or the sb3 vec normalize function
[docs] class OracleNormalization(Normalization): """ Oracle Normalization assumes the knowledge of the max and min values of the dataset. This is necessary to perform a global min/max normalization. Alternatively, a rolling normalization could be implemented. """ def __init__(self, db, building_flag, pv_flag, price_flag, ev_conf: EvConfig, load_calc: LoadCalculation, aux: bool): """ Initialize max and min values of the dataset, globally. :param db: Database dataframe :param building_flag: Include building load flag :param pv_flag: Include PV flag :param price_flag: Include price flag :param ev_conf: EV configuration object :param load_calc: Load calculation object :param aux: Flag whether to include auxiliary observations or not (bool) """ self.max_time_left = max(db["time_left"]) self.max_price = (max(db["DELU"]) + ev_conf.fixed_markup) * ev_conf.variable_multiplier self.min_price = (min(db["DELU"]) + ev_conf.fixed_markup) * ev_conf.variable_multiplier self.max_tariff = (max(db["tariff"])) * (1 - ev_conf.feed_in_deduction) self.min_tariff = (min(db["tariff"])) * (1 - ev_conf.feed_in_deduction) self.building_flag = building_flag self.pv_flag = pv_flag self.price_flag = price_flag self.aux = aux if self.building_flag: self.max_building = max(db["load"]) if self.pv_flag: self.max_pv = max(db["pv"]) if self.aux: self.max_soc = ev_conf.target_soc self.max_hours_needed = ((ev_conf.target_soc * ev_conf.init_battery_cap) /(load_calc.evse_max_power * ev_conf.charging_eff)) self.max_laxity = 5 self.max_evse = load_calc.evse_max_power self.max_grid = load_calc.grid_connection
[docs] def normalize_obs(self, input_obs: dict) -> np.ndarray: """ Normalization function. Different cases are checked depending on the flags of PV, load, price, and aux. Input observations are a dictionary with clear namings, to make further changes in the code easy and readable. :param input_obs: Input observation: Un-normalized observations as specified in the observer. :return: Normalized observation. """ # normalization is done here, so if the rule is changed it is automatically adjusted in step and reset input_obs["soc"] = (input_obs["soc"]) # soc is already normalized input_obs["hours_left"] = list(np.divide(input_obs["hours_left"], self.max_time_left)) # max hours of entire db # normalize spot price between 0 and 1, there are negative values # formula: z_i = (x_i - min(x))/(max(x) - min(x)) if self.price_flag: input_obs["price"] = list(np.divide(np.subtract(input_obs["price"], self.min_price), np.subtract(self.max_price, self.min_price))) input_obs["tariff"] = list(np.divide(np.subtract(input_obs["tariff"], self.min_tariff), np.subtract(self.max_tariff, self.min_tariff))) if not self.price_flag: output_obs = np.array(self.flatten_obs(input_obs), dtype=np.float32) if self.aux: input_obs["there"] = list(np.divide(input_obs["there"], 1)) # there input_obs["target_soc"] = list(np.divide(input_obs["target_soc"], self.max_soc)) # target soc input_obs["charging_left"] = list(np.divide(input_obs["charging_left"], self.max_soc)) # charging left input_obs["hours_needed"] = list(np.divide(input_obs["hours_needed"], self.max_hours_needed)) # hours needed input_obs["laxity"] = list(np.divide(input_obs["laxity"], self.max_laxity)) # laxity input_obs["evse_power"] = list(np.divide(input_obs["evse_power"], self.max_evse)) # evse power output_obs = np.array(self.flatten_obs(input_obs), dtype=np.float32) elif not self.building_flag and not self.pv_flag: output_obs = np.concatenate( (input_obs["soc"], input_obs["hours_left"], input_obs["price"], input_obs["tariff"]), dtype=np.float32) if self.aux: input_obs["there"] = list(np.divide(input_obs["there"], 1)) # there input_obs["target_soc"] = list(np.divide(input_obs["target_soc"], self.max_soc)) # target soc input_obs["charging_left"] = list(np.divide(input_obs["charging_left"], self.max_soc)) # charging left input_obs["hours_needed"] = list(np.divide(input_obs["hours_needed"], self.max_hours_needed)) # hours needed input_obs["laxity"] = list(np.divide(input_obs["laxity"], self.max_laxity)) # laxity input_obs["evse_power"] = list(np.divide(input_obs["evse_power"], self.max_evse)) # evse power # input obs of month, week and hour sin/cos don't need to be normalized output_obs = np.array(self.flatten_obs(input_obs), dtype=np.float32) elif self.building_flag and not self.pv_flag: input_obs["building_load"] = list(np.divide(input_obs["building_load"], self.max_building)) output_obs = np.concatenate( (input_obs["soc"], input_obs["hours_left"], input_obs["price"], input_obs["tariff"], input_obs["building_load"] ), dtype=np.float32) if self.aux: input_obs["there"] = list(np.divide(input_obs["there"], 1)) # there input_obs["target_soc"] = list(np.divide(input_obs["target_soc"], self.max_soc)) # target soc input_obs["charging_left"] = list(np.divide(input_obs["charging_left"], self.max_soc)) # charging left input_obs["hours_needed"] = list(np.divide(input_obs["hours_needed"], self.max_hours_needed)) # hours needed input_obs["laxity"] = list(np.divide(input_obs["laxity"], self.max_laxity)) # laxity input_obs["evse_power"] = list(np.divide(input_obs["evse_power"], self.max_evse)) # evse power input_obs["grid_cap"] = list(np.divide(input_obs["grid_cap"], self.max_grid)) # grid connection input_obs["avail_grid_cap"] = list(np.divide(input_obs["avail_grid_cap"], self.max_grid)) # available grid input_obs["possible_avg_action"] = list(np.divide(input_obs["possible_avg_action"], 1)) # possible avg action per car # input obs of month, week and hour sin/cos don't need to be normalized output_obs = np.array(self.flatten_obs(input_obs), dtype=np.float32) elif not self.building_flag and self.pv_flag: input_obs["building_load"] = list(np.divide(input_obs["building_load"], self.max_building)) output_obs = np.concatenate( (input_obs["soc"], input_obs["hours_left"], input_obs["price"], input_obs["tariff"], input_obs["pv"] ), dtype=np.float32) if self.aux: input_obs["there"] = list(np.divide(input_obs["there"], 1)) # there input_obs["target_soc"] = list(np.divide(input_obs["target_soc"], self.max_soc)) # target soc input_obs["charging_left"] = list(np.divide(input_obs["charging_left"], self.max_soc)) # charging left input_obs["hours_needed"] = list(np.divide(input_obs["hours_needed"], self.max_hours_needed)) # hours needed input_obs["laxity"] = list(np.divide(input_obs["laxity"], self.max_laxity)) # laxity input_obs["evse_power"] = list(np.divide(input_obs["evse_power"], self.max_evse)) # evse power # input obs of month, week and hour sin/cos don't need to be normalized output_obs = np.array(self.flatten_obs(input_obs), dtype=np.float32) elif self.building_flag and self.pv_flag: input_obs["building_load"] = list(np.divide(input_obs["building_load"], self.max_building)) input_obs["pv"] = list(np.divide(input_obs["pv"], self.max_pv)) output_obs = np.concatenate( (input_obs["soc"], input_obs["hours_left"], input_obs["price"], input_obs["tariff"], input_obs["building_load"], input_obs["pv"] ), dtype=np.float32) if self.aux: input_obs["there"] = list(np.divide(input_obs["there"], 1)) # there input_obs["target_soc"] = list(np.divide(input_obs["target_soc"], self.max_soc)) # target soc input_obs["charging_left"] = list(np.divide(input_obs["charging_left"], self.max_soc)) # charging left input_obs["hours_needed"] = list(np.divide(input_obs["hours_needed"], self.max_hours_needed)) # hours needed input_obs["laxity"] = list(np.divide(input_obs["laxity"], self.max_laxity)) # laxity input_obs["evse_power"] = list(np.divide(input_obs["evse_power"], self.max_evse)) # evse power input_obs["grid_cap"] = list(np.divide(input_obs["grid_cap"], self.max_grid)) # grid connection input_obs["avail_grid_cap"] = list(np.divide(input_obs["avail_grid_cap"], self.max_grid)) # available grid input_obs["possible_avg_action"] = list(np.divide(input_obs["possible_avg_action"], 1)) # possible avg action per car # input obs of month, week and hour sin/cos don't need to be normalized output_obs = np.array(self.flatten_obs(input_obs), dtype=np.float32) else: output_obs = None raise RuntimeError("Problem with included information. Check building and PV flags.") return output_obs
[docs] def make_boundaries(self, dim: tuple[int]) -> tuple[float, float] | tuple[np.ndarray, np.ndarray]: """ The boundaries are 0 and 1 because the observations are min/max normalized. :param dim: Dimension of the observation depending on the flags :return: Low and high observation arrays for gym.Spaces. """ low_obs = np.zeros(dim, dtype=np.float32) high_obs = np.ones(dim, dtype=np.float32) return low_obs, high_obs
[docs] @staticmethod def flatten_obs(obs): """ Observations must be flattened for openAI gym compatibility. The parsed observation must be a flat array and not a dictionary. The dictionary either includes float or array. The function removes the nesting. :param obs: Normalized observation dictionary :return: A flattened array - necessary for the RL algorithms to be in a 1-dim array e.g. [v_1, ..., v_N] """ flattened_obs = [v if isinstance(v, list) else [v] for v in obs.values()] flattened_obs = [item for sublist in flattened_obs for item in sublist] return flattened_obs