Source code for fleetrl.utils.data_processing.data_processing

import datetime
import os

import numpy as np
import pandas as pd

from fleetrl.fleet_env.config.ev_config import EvConfig
from fleetrl.fleet_env.config.time_config import TimeConfig


# this class contains all the necessary information from the vehicle and its schedule
# the schedule is imported from emobpy and manipulated so that the output data is in the right format

[docs] class DataLoader: """ The DataLoader class handles the csv import and pre-processing of the timeseries information. Optimized pandas functions such as merge_asof are used to significantly speed up processing compared to loops. Cython could further improve this initial processing step. It only happens once when instantiating env objects. """ def __init__(self, path_name, schedule_name, spot_name, tariff_name, building_name, pv_name, time_conf: TimeConfig, ev_conf: EvConfig, target_soc, building_flag, pv_flag, real_time: bool): """ Initial information that is required for loading data :param path_name: string pointing to the parent directory of input files :param schedule_name: string of the schedule csv, e.g. "1_LMD.csv" :param spot_name: string of the spot price csv :param tariff_name: string of the feed-in tariff csv :param building_name: building load csv :param pv_name: pv data csv :param time_conf: time config object :param ev_conf: ev config object :param target_soc: target soc :param building_flag: include/load building load flag :param pv_flag: include/load pv flag """ # save the time_conf within DataLoader as well because it is used in some functions self.time_conf = time_conf # schedule import from excel # db = pd.read_excel(os.path.dirname(__file__) + '/test_simple.xlsx') self.schedule = pd.read_csv(os.path.join(path_name, schedule_name), parse_dates=["date"]) # setting the index of the df to the date for resampling self.schedule.set_index("date", inplace=True, drop=False) # TODO: edge case of duplicate data still possible if not real_time: # if real_time, data is not resampled and used as is # resampling the df. consumption and distance are summed, power rating mean like in emobpy # group by ID is needed so the different cars don't get overwritten (they have the same dates) # NB: up-sampling does not work here, would require filling the empty cells with new data (e.g. ffill) self.schedule = self.schedule.groupby("ID").resample(time_conf.freq).agg( {'Location': 'first', 'ID': 'first', 'Consumption_kWh': 'sum', 'ChargingStation': 'first', 'PowerRating_kW': 'mean', 'Distance_km': 'sum', 'date': 'first'}) # resetting the index to a numerical value self.schedule.index = range(len(self.schedule)) # compute / preprocess from loaded schedule self.compute_from_schedule(ev_conf, time_conf, target_soc) # create a date range with the chosen frequency # Given the desired frequency, create a (down-sampled) column of timestamps self.date_range = pd.DataFrame() if not real_time: # date_range according to model frequency self.date_range["date"] = pd.date_range(start=self.schedule["date"].min(), end=self.schedule["date"].max(), freq=time_conf.freq) else: # date_range according to dataset self.date_range["date"] = self.schedule["date"].unique() # load csv files self.spot_price = self.load_prices(path_name, spot_name, self.date_range) self.tariff = self.load_feed_in(path_name, tariff_name, self.date_range) if building_flag: self.building_load = self.load_building_load(path_name, building_name, self.date_range) if pv_flag: self.pv = self.load_pv(path_name, pv_name, self.date_range) if not building_flag and not pv_flag: self.db = pd.concat([self.schedule, self.spot_price["DELU"], self.tariff["tariff"]], axis=1) elif building_flag and not pv_flag: self.db = pd.concat([self.schedule, self.spot_price["DELU"], self.tariff["tariff"], self.building_load["load"]], axis=1) elif not building_flag and pv_flag: self.db = pd.concat([self.schedule, self.spot_price["DELU"], self.tariff["tariff"], self.pv["pv"]], axis=1) elif building_flag and pv_flag: self.db = pd.concat([self.schedule, self.spot_price["DELU"], self.tariff["tariff"], self.building_load["load"], self.pv["pv"]], axis=1) else: self.db = None raise RuntimeError("Problem with building database. Check building and PV flags.")
[docs] def compute_from_schedule(self, ev_conf, time_conf, target_soc): """ This function pre-processes the input data and adds additional rows to the file. There flag, time left at charger, soc on return, consumption, etc. Use merge_asof and vectorized operations for performance gains :return: None """ # new column with flag if EV is there or not self.schedule["There"] = (np.array(self.schedule["PowerRating_kW"] != 0, dtype=int)) # make a new column in db that checks whether the charging station value changed self.schedule["Change"] = np.array(self.schedule["ChargingStation"] != self.schedule["ChargingStation"].shift(1), dtype=int) # create a group, so they can be easily grouped by change (home->none or none->home) self.schedule["Group"] = self.schedule["Change"].cumsum() # create a column for the total consumption that will later be populated self.schedule["TotalConsumption"] = np.zeros(len(self.schedule)) # calculate the total consumption of a single trip by summing over a group # only choose the groups where the car is driving (=="none") # sum over the consumption # resetting the index and dropping the old group index consumption = (self.schedule.loc[self.schedule["ChargingStation"] == "none"] .groupby('Group')["Consumption_kWh"].sum().reset_index(drop=True)) trip_length = (self.schedule.loc[self.schedule["ChargingStation"] == "none"] .groupby('Group')["date"].count().reset_index(drop=True)) # get the last dates of each trip # resetting the index so the group index goes away last_dates = (self.schedule.loc[self.schedule["ChargingStation"] == "none"] .groupby('Group')["date"].last().reset_index(drop=True)) # get the first dates of each trip # resetting the index so the group index goes away departure_dates = (self.schedule.loc[self.schedule["ChargingStation"] == "none"] .groupby('Group')["date"].first().reset_index(drop=True)) # the return dates are on the next timestep # drop duplicates because the date is iterated through for each ev anyway return_dates = last_dates.add(datetime.timedelta(minutes=time_conf.minutes)) # get the vehicle ids of the trips # resetting the index so the group index goes away # dropping duplicates because the loop iterates through both dates and ids anyway ids = self.schedule.loc[self.schedule["ChargingStation"] == "none"].groupby('Group')["ID"].first().reset_index(drop=True) # creating a dataframe for calculating the consumption, the info of ID and date is needed res_return = pd.DataFrame({"ID": ids, "consumption": consumption, "date": return_dates, "len": trip_length}) # creating a dataframe for calculating the time_left, ID and date needed for pd.merge_asof() res_departure = pd.DataFrame({"ID": ids, "dep": departure_dates, "date": departure_dates}) # match return dates with db, backwards fill, sort by ID, match on date merged_cons = pd.merge_asof(self.schedule.sort_values("date"), res_return.sort_values("date"), on="date", by="ID", direction="backward" ) # sort the df into the right order and reset the index merged_cons = merged_cons.sort_values(["ID", "date"]).reset_index(drop=True) # set consumption to 0 where the car is not there because information is unknown to the agent at that point merged_cons.loc[merged_cons["There"] == 0, "consumption"] = 0 # set trip length to 0 where the car is not there because information unknown to agent at that point merged_cons.loc[merged_cons["There"] == 0, "len"] = 0 # fill NaN values merged_cons.fillna(0, inplace=True) # match departure dates with dates in db, forward direction, sort by ID, match on date merged_time_left = pd.merge_asof(self.schedule.sort_values("date"), res_departure.sort_values("date"), on="date", by="ID", direction="forward" ) # reset the index merged_time_left = merged_time_left.sort_values(["ID", "date"]).reset_index(drop=True) # calculate time left with the correct index merged_time_left["time_left"] = (merged_time_left["dep"] - merged_time_left["date"]).dt.total_seconds() / 3600 # time left is 0 when the car is not there merged_time_left.loc[merged_time_left["There"] == 0, "time_left"] = 0 # fill NaN values merged_time_left.loc[:, "time_left"].fillna(0, inplace=True) # add computed information to db self.schedule["last_trip_total_consumption"] = merged_cons.loc[:, "consumption"] self.schedule["last_trip_total_length_hours"] = merged_cons.loc[:, "len"].div(self.time_conf.time_steps_per_hour) self.schedule["time_left"] = merged_time_left.loc[:, "time_left"] # create SOC column and populate with zeros # calculate SOC on return, assuming the previous trip charged to the target soc # maybe this could be changed in the future to make it more complex (future SOC depends on previous SOC) self.schedule["SOC_on_return"] = target_soc - self.schedule["last_trip_total_consumption"].div( ev_conf.init_battery_cap) self.schedule.loc[self.schedule["There"] == 0, "SOC_on_return"] = 0
[docs] def load_prices_original(self, path_name, spot_name, date_range): """ Load prices from csv :param path_name: Parent directory string :param spot_name: file name with .csv ending :param date_range: pd.date_range which was defined from the "date" column in the EV schedules. Note that the EV schedule dates therefore dictate the model's timeframe. :return: spot price dataframe """ # load csv spot = pd.read_csv(os.path.join(path_name, spot_name), delimiter=";", decimal=",") # drop price information of other countries spot = spot.drop(columns=spot.columns[4:20]) # put the date in the same format as the schedule spot = spot.rename(columns={"Date": "date"}) spot["date"] = spot["date"] + " " + spot["Start"] + ":00" spot["date"] = pd.to_datetime(spot["date"], format="mixed") # rename column for accessibility spot = spot.rename(columns={"Deutschland/Luxemburg [€/MWh] Original resolutions": "DELU"}) spot = self._date_checker(df=spot, date_range=date_range) # TODO test if this also works for down-sampling. Right now this up-samples from hourly to quarter-hourly spot_price = pd.merge_asof(date_range, spot.sort_values("date"), on="date", direction="backward" ) # return the spot price at the right granularity return spot_price
[docs] def load_prices(self, path_name, spot_name, date_range): """ Load prices from csv :param path_name: Parent directory string :param spot_name: file name with .csv ending :param date_range: pd.date_range which was defined from the "date" column in the EV schedules. Note that the EV schedule dates therefore dictate the model's timeframe. :return: spot price dataframe """ # load csv spot = pd.read_csv(os.path.join(path_name, spot_name), delimiter=";", decimal=",", parse_dates=["date"]) # drop price information of other countries spot = spot.drop(columns=spot.columns[4:20]) # put the date in the same format as the schedule # spot["date"] = pd.to_datetime(spot["date"], format="mixed") # rename column for accessibility spot = spot.rename(columns={"Deutschland/Luxemburg [€/MWh] Original resolutions": "DELU"}) # check that the years are the same # otherwise change the year and fix leap year problems spot = self._date_checker(df=spot, date_range=date_range) # TODO test if this also works for down-sampling. Right now this up-samples from hourly to quarter-hourly spot_price = pd.merge_asof(date_range, spot.sort_values("date"), on="date", direction="backward" ) # return the spot price at the right granularity return spot_price
[docs] def load_feed_in(self, path_name, tariff_name, date_range): """ Load feed-in from csv :param path_name: Parent directory string :param tariff_name: file name with .csv ending :param date_range: pd.date_range which was defined from the "date" column in the EV schedules. Note that the EV schedule dates therefore dictate the model's timeframe. :return: tariff dataframe """ # load csv df = pd.read_csv(os.path.join(path_name, tariff_name), delimiter=";", decimal=",", parse_dates=["date"]) df = self._date_checker(df=df, date_range=date_range) tariff = pd.merge_asof(date_range, df.sort_values("date"), on="date", direction="backward" ) # return the tariff at the right granularity return tariff
[docs] def load_building_load(self, path_name, file_name, date_range): """ Load building load from csv :param path_name: Parent directory string :param file_name: file name with .csv ending :param date_range: pd.date_range which was defined from the "date" column in the EV schedules. Note that the EV schedule dates therefore dictate the model's timeframe. :return: load dataframe """ b_load = pd.read_csv(os.path.join(path_name, file_name), delimiter=",", parse_dates=["date"]) # b_load["date"] = pd.to_datetime(b_load["date"], format="mixed") b_load = self._date_checker(df = b_load, date_range=date_range) # TODO test if this also works for down-sampling. Right now this up-samples from hourly to quarter-hourly building_load = pd.merge_asof(date_range, b_load.sort_values("date"), on="date", direction="backward" ) # return building load at right granularity return building_load
[docs] def load_pv(self, path_name, pv_name, date_range): """ Load pv from csv :param path_name: Parent directory string :param pv_name: file name with .csv ending :param date_range: pd.date_range which was defined from the "date" column in the EV schedules. Note that the EV schedule dates therefore dictate the model's timeframe. :return: pv dataframe """ pv = pd.read_csv(os.path.join(path_name, pv_name), delimiter=",", decimal=",", parse_dates=["date"]) # pv["date"] = pd.to_datetime(pv["date"], format="mixed") pv["pv"] = pv["pv"].astype(float) pv = self._date_checker(df=pv, date_range=date_range) pv = pd.merge_asof(date_range, pv.sort_values("date"), on="date", direction="backward") # return pv generation return pv
[docs] @staticmethod def shape_price_reward(db: pd.DataFrame, ev_conf: EvConfig): """ - de-trend prices, so they can be used as a reward function - agent should not be penalised more if the general price level is higher - instead, the agent should just focus on price fluctuations and exploit them - computing average for whole year, split data into monthly chunks - offset monthly chunks, such that the monthly average = annual average - this corrects for absolute price increases, but leaves fluctuations intact :param db: database with schedules, pv, prices, load, dataframe :param ev_conf: ev config object :return: db with updated, de-trended prices """ price = db["DELU"].dropna() price = price.add(ev_conf.fixed_markup) price = price.mul(ev_conf.variable_multiplier) price_total_avg = price.mean() price.index = db.loc[db["ID"]==0, "date"] resampled_price = price.resample("M") result = pd.DataFrame() for name, group in resampled_price: chunk_avg = group.mean() offset_prices = group - chunk_avg + price_total_avg result = pd.concat([result, offset_prices]) result.columns=["price_reward_curve"] result = result.reset_index() db = pd.concat((db, result["price_reward_curve"]), axis=1) tariff = db["tariff"].dropna() tariff = tariff.mul(1 - ev_conf.feed_in_deduction) tariff_total_avg = tariff.mean() tariff.index = db.loc[db["ID"]==0, "date"] resampled_tariff = tariff.resample("M") result = pd.DataFrame() for name, group in resampled_tariff: chunk_avg = group.mean() offset_tariff = group - chunk_avg + tariff_total_avg result = pd.concat([result, offset_tariff]) result.columns=["tariff_reward_curve"] result = result.reset_index() db = pd.concat((db, result["tariff_reward_curve"]), axis=1) return db
@staticmethod def _date_checker(df: pd.DataFrame, date_range: pd.DatetimeIndex) -> pd.DataFrame: input_start_year = df.iloc[0]["date"].year date_range_start_year = date_range.iloc[0][0].year if input_start_year != date_range_start_year: print("Start year of input data doesn't match simulation data range. Adjusting...") df["date"] = df["date"] + pd.DateOffset(years = date_range_start_year - input_start_year) assert(df.iloc[0]["date"] == date_range.iloc[0][0]), "Invalid start time." assert(df.iloc[-1]["date"].year == date_range.iloc[-1][0].year), "Invalid end year." return df