Source code for enlopy.analysis

from __future__ import division
import numpy as np
import pandas as pd

from .utils import clean_convert

__all__ = ['reshape_timeseries', 'get_LDC', 'get_load_archetypes', 'get_load_stats', 'detect_outliers']

[docs]def reshape_timeseries(Load, x='dayofyear', y=None, aggfunc='sum'): """Returns a reshaped pandas DataFrame that shows the aggregated load for selected timeslices. e.g. time of day vs day of year Parameters: Load (pd.Series, np.ndarray): timeseries x (str): x axis aggregator. Has to be an accessor of pd.DatetimeIndex (year, dayoftime, week etc.) y (str): similar to above for y axis Returns: reshaped pandas dataframe according to x,y """ # Have to convert to dataframe in order for pivottable to work # 1D, Dataframe a = clean_convert(Load.copy(), force_timed_index=True, always_df=True) a.name = 0 if len(a.columns) > 1: raise ValueError('Works only with 1D') if x is not None: a[x] = getattr(a.index, x) if y is not None: a[y] = getattr(a.index, y) a = a.reset_index(drop=True) return a.pivot_table(index=x, columns=y, values=a.columns[0], aggfunc=aggfunc).T
[docs]def get_LDC(Load, x_norm=True, y_norm=False): r"""Generates the Load Duration Curve based on a given load. For 2-dimensional dataframes the x-axis sorting is done based on sum of all series. Sorting on the y-axis is done based on the coefficient of variance. Arguments: Load (pd.Series): timeseries x_norm (bool): Normalize x axis (0,1) y_norm (bool): Normalize y axis (0,1) Returns: np.ndarray: tuple (x, y) ready for plotting (e.g. plt(\*LDC_load(load))) """ Load1 = clean_convert(Load,force_timed_index=False) if Load1.ndim >= 2: # Sort x axis by total value sorted_ind = Load1.sum(axis=1).sort_values(ascending=False).index # Sort series by variance coefficient. Baseline load should have smaller CV and should go lower sorted_cols = (Load1.std()/Load1.mean()).sort_values().index y = Load1.loc[sorted_ind, sorted_cols].values else: y = Load1.sort_values(ascending=False).values x = np.arange(1, len(y) + 1 ) if x_norm: x = x / len(x) if y_norm: y = y / y.max() return x, y
# remove nan because histogram does not work #load_masked = Load[~np.isnan(Load)] #n, xbins = np.histogram(load_masked, bins=bins, density=True) # xbins = xbins[:-1] #remove last element to make equal size #cum_values = np.zeros(xbins.shape) #cum_values[1:] = np.cumsum(n*np.diff(xbins)) #out = np.array([1-cum_values, xbins]) # out = np._r[[1 0], out] # Add extra point #if trunc_0: # Trunc non zero elements # out[out < 0] = 0 #return out
[docs]def get_load_archetypes(Load, k=2, x='hour', y='dayofyear', plot_diagnostics=False): """Extract typical load profiles using k-means and vector quantization. the time scale of archetypes depend on the selected dimensions (x,y). For the default values daily archetypes will be extracted. Parameters: Load (pd.Series): timeseries k (int): number of archetypes to identify and extract x (str): This will define how the timeseries will be grouped by. Has to be an accessor of pd.DatetimeIndex y (str): similar to above for y axis. plot_diagnostics (bool): If true a figure is plotted showing an overview of the results Returns: np.ndarray: dimensions (k, len(x)) """ from scipy.cluster.vq import whiten, kmeans, vq df = reshape_timeseries(Load, x=x, y=y, aggfunc='mean').astype(float) df_white = whiten(df) clusters_center, __ = kmeans(df_white, k) clusters_center_dewhitened = clusters_center.T * np.array([df.std(), ] * k ).T if plot_diagnostics: try: import matplotlib.pyplot as plt clusters, _ = vq(df_white, clusters_center) cm = _n_colors_from_colormap(k) ax1 = df.T.plot(legend=False, alpha=.1, color=[cm[i] for i in clusters]) # Add colored cluster centers as lines ax1.set_prop_cycle('color', cm) ax1.plot(clusters_center_dewhitened, linewidth=3, linestyle='--') plt.figure() # FIXME: works only with weekdays day_clusters = pd.DataFrame({y: Load.resample('d').mean().index.weekday, 'clusters': clusters, 'val': 1}) x_labels = "Mon Tue Wed Thu Fri Sat Sun".split() day_clusters.pivot_table(columns=y, index='clusters', aggfunc='count').T.plot.bar(stacked=True) plt.gca().set_xticklabels(x_labels) except Exception: #FIXME: specify exception print ('Works only with daily profile clustering') return clusters_center_dewhitened
[docs]def get_load_stats(Load, per='a'): """Find load profile characteristics. Among other it estimates: peak, load factor, base load factor, operating hours, Arguments: Load: timeseries of load to be examined. A timeseries index is needed. per: reporting periods. Annual by default. Based on pandas time offsets Returns: dict: Parameter dictionary """ #TODO 2D from .stats import all_stats_desc Load1 = clean_convert(Load, force_timed_index=True, freq='h') g = Load1.groupby(pd.Grouper(freq=per)) if len(g) > 100: print ('Warning: Too many periods ({}) selected'.format(len(g))) p_dict = {} for period, load_per in g: ind = str(period.to_period()) p_dict[ind] = {k: v(load_per) for k, v in all_stats_desc.items()} # named tuple instead of dict? return pd.DataFrame.from_dict(p_dict)
def _n_colors_from_colormap(n, cmap='Set1'): """ Returns lists of color tuples(RGBA) n: number of colours cmap: matplotlib colormap """ from matplotlib.cm import get_cmap cm = get_cmap(cmap) return [cm(1.*i/n) for i in range(n)]
[docs]def detect_outliers(Load, threshold=None, window=5, plot_diagnostics=False): """ Detect and optionally remove outliers based on median rolling window filtering. Inspired by https://ocefpaf.github.io/python4oceanographers/blog/2015/03/16/outlier_detection/ Arguments: Load: input timeseries threshold: if None then 3 sigma is selected as threshold window: how many values to check plot_diagnostics: Plot diagnostics to check whether the outliers were removed accurately Return: index position of detected outliers """ # TODO : Clean zero values (interpolate) a = Load.rolling(window=window, center=True).median().fillna(method='bfill').fillna(method='ffill') difference = np.abs(Load - a) if threshold is None: threshold = 3 * np.std(Load) outlier_idx = difference > threshold if plot_diagnostics: if len(outlier_idx > 0): kw = dict(marker='o', linestyle='none', color='r', alpha=0.5) Load.plot() Load[outlier_idx].plot(**kw) else: print('No outliers detected. If you think that there are, try to raise the threshold') return outlier_idx
def countweekend_days_per_month(df, weekdays=True): #TODO generalize count_x_per_y """Count number of occurrences where the day is Saturday or Sunday. Loops per month""" from collections import Counter out = [] for __, values in df.index.groupby(df.index.month).items(): m_count = Counter([day.weekday() for day in values]) out.append(m_count[5] + m_count[6]) # 5,6 is saturday sunday return out