Source code for pyphi.batch

# -*- coding: utf-8 -*-
"""


Created on Mon Apr 11 14:58:35 2022

Batch data is assumed to come in an excel file 
with first column being batch identifier and following columns
being process variables.
Optionally the second column labeled 'PHASE','Phase' or 'phase' indicating
the phase of exceution

Change log:
* added Dec 28 2023  Titles can be sent to contribution plots via plot_title flag    
                     Monitoring diagnostics are also plotted against sample starting with 1

* added Dec 27 2023  Corrected plots to use correct xaxis starting with sample =1 
                     Ammended indicator variable alignment not to replace the IV with a linear sequence but to keep orginal data
                     
* added Dec 4  2023  Added a BatchVIP calculation
* added Apr 23 2023  Corrected a very dumb mistake I made coding when tired    
    
* added Apr 18 2023 Added descriptors routine to obtain landmarks of the batch
                    such as min,max,ave of a variable [during a phase if indicated so]    
                    Modifed plot_var_all_batches to plot against the values in a Time column
                    and also add the legend for the BatchID
                    
* added Apr 10 2023 Added batch contribution plots
                    Added build_rel_time to create a tag of relative 
                    run time from a timestamp
                    
* added Apr 7 2023  Added alignment using indicator variable per phase


* added Apr 5 2023  Added the capability to monitor a variable in "Soft Sensor" mode
                    which implies there are no measurements for it (pure prediction)
                    as oppose to a forecast where there are new measurments coming in time.
                    
* added Jul 20 2022 Distribution of number of samples per phase plot
* added Aug 10 2022 refold_horizontal | clean_empty_rows | predict 
* added Aug 12 2022 replicate_batch

@author: S. Garcia-Munoz sgarciam@ic.ak.uk salg@andrew.cmu.edu
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
from pyphi import calc as phi

# Sequence of color blind friendly colors.
cb_color_seq=['b','r','m','navy','bisque','silver','aqua','pink','gray']
mpl.rcParams['axes.prop_cycle'] = mpl.cycler(color=cb_color_seq) 

[docs] def unique(df,colid): """Return unique values from a DataFrame column, preserving order of first occurrence. A replacement for ``np.unique`` that does not sort the result, returning values in the order they first appear in the DataFrame. Args: df (pd.DataFrame): Input DataFrame. colid (str): Name of the column to extract unique values from. Returns: list: Unique values in the order they first appear in ``df[colid]``. """ aux=df.drop_duplicates(subset=colid,keep='first') unique_entries=aux[colid].values.tolist() return unique_entries
[docs] def mean(X,axis): """Compute the mean of a 2-D array along an axis, ignoring NaN values. Args: X (np.ndarray): 2-D input array, may contain ``np.nan``. axis (int): Axis along which to compute the mean. ``0`` = column-wise (mean of each column across rows). ``1`` = row-wise (mean of each row across columns). Returns: np.ndarray: 1-D array of mean values, with NaN entries excluded from the denominator so results remain unbiased in the presence of missing data. """ X_nan_map = np.isnan(X) X_ = X.copy() if X_nan_map.any(): X_nan_map = X_nan_map*1 X_[X_nan_map==1] = 0 #Calculate mean without accounting for NaN' if axis==0: aux = np.sum(X_nan_map,axis=0) x_mean = np.sum(X_,axis=0,keepdims=1)/(np.ones((1,X_.shape[1]))*X_.shape[0]-aux) else: aux = np.sum(X_nan_map,axis=1) x_mean = np.sum(X_,axis=1,keepdims=1)/(np.ones((X_.shape[0],1))*X_.shape[1]-aux) else: x_mean = np.mean(X_,axis=axis,keepdims=1) return x_mean.reshape(-1)
[docs] def simple_align(bdata,nsamples): """Align batch data to a common length by linear interpolation on row index. Resamples every batch to exactly ``nsamples`` rows by linearly interpolating each variable against the original row sequence. No phase information is used; all samples are treated as a single continuous trajectory. Args: bdata (pd.DataFrame): Batch data. First column is batch ID; second column may optionally be a phase label (``'Phase'``, ``'phase'``, or ``'PHASE'``); remaining columns are process variables. Batches are stacked vertically. nsamples (int): Target number of samples per batch after alignment. Returns: pd.DataFrame: Aligned batch data with all batches resampled to ``nsamples`` rows. Phase labels (if present) are mapped to the nearest original sample using rounded interpolation indices. """ bdata_a=[] if (bdata.columns[1]=='PHASE') or \ (bdata.columns[1]=='phase') or \ (bdata.columns[1]=='Phase'): phase=True else: phase = False aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first') unique_batches=aux[aux.columns[0]].values.tolist() for b in unique_batches: data_=bdata[bdata[bdata.columns[0]]==b] indx=np.arange(data_.shape[0]) new_indx=np.linspace(0,data_.shape[0]-1,nsamples) bname_rs=[data_[bdata.columns[0]].values[0]]*nsamples if phase: phase_list=data_[bdata.columns[1]].values.tolist() roundindx=np.round(new_indx).astype('int').tolist() phase_rs=[] for i in roundindx: phase_rs.append(phase_list[i]) vals=data_.values[:,2:] cols=data_.columns[2:] else: vals=data_.values[:,1:] cols=data_.columns[1:] vals_rs=np.zeros((nsamples,vals.shape[1])) for i in np.arange(vals.shape[1]): vals_rs[:,i]=np.interp(new_indx,indx.astype('float'),vals[:,i].astype('float')) df_=pd.DataFrame(vals_rs,columns=cols) if phase: df_.insert(0,bdata.columns[1],phase_rs) df_.insert(0,bdata.columns[0],bname_rs) bdata_a.append(df_) bdata_a=pd.concat(bdata_a) return bdata_a
[docs] def phase_simple_align(bdata,nsamples): """Align batch data to a common length per phase by linear interpolation. Resamples each phase of each batch independently to the specified number of samples, then concatenates phases back in order. Requires phase information in the second column. Args: bdata (pd.DataFrame): Batch data. First column is batch ID; second column must be a phase label (``'Phase'``, ``'phase'``, or ``'PHASE'``); remaining columns are process variables. Batches are stacked vertically. nsamples (dict): Number of samples to generate per phase. Keys must match the phase labels in the data. Resampling is linear with respect to row number within each phase. Example: ``{'Heating': 100, 'Reaction': 200, 'Cooling': 10}``. Returns: pd.DataFrame: Aligned batch data with each phase resampled to the specified number of samples, phases concatenated in key order of ``nsamples``. """ bdata_a=[] if (bdata.columns[1]=='PHASE') or \ (bdata.columns[1]=='phase') or \ (bdata.columns[1]=='Phase'): phase=True else: phase = False if phase: aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first') unique_batches=aux[aux.columns[0]].values.tolist() for b in unique_batches: data_=bdata[bdata[bdata.columns[0]]==b] vals_rs=[] bname_rs=[] phase_rs=[] firstone=True for p in nsamples.keys(): p_data= data_[data_[data_.columns[1]]==p] samps=nsamples[p] indx=np.arange(p_data.shape[0]) new_indx=np.linspace(0,p_data.shape[0]-1,samps) bname_rs_=[p_data[p_data.columns[0]].values[0]]*samps phase_rs_=[p_data[p_data.columns[1]].values[0]]*samps vals=p_data.values[:,2:] cols=p_data.columns[2:] vals_rs_=np.zeros((samps,vals.shape[1])) for i in np.arange(vals.shape[1]): vals_rs_[:,i]=np.interp(new_indx,indx.astype('float'),vals[:,i].astype('float')) if firstone: vals_rs=vals_rs_ firstone=False else: vals_rs=np.vstack((vals_rs,vals_rs_)) bname_rs.extend(bname_rs_) phase_rs.extend(phase_rs_) df_=pd.DataFrame(vals_rs,columns=cols) df_.insert(0,bdata.columns[1],phase_rs) df_.insert(0,bdata.columns[0],bname_rs) bdata_a.append(df_) bdata_a=pd.concat(bdata_a) return bdata_a
[docs] def phase_iv_align(bdata,nsamples): """Align batch data using an indicator variable (IV) or row index, per phase. Provides the most flexible alignment: each phase can be aligned either by linear resampling (default) or by using a monotonically changing process variable (indicator variable) as the alignment axis. Args: bdata (pd.DataFrame): Batch data. First column is batch ID; second column must be a phase label (``'Phase'``, ``'phase'``, or ``'PHASE'``); remaining columns are process variables. Batches are stacked vertically. nsamples (dict): Alignment specification per phase. Each value can be: an integer for linear alignment, e.g. ``{'Heating': 100, 'Reaction': 200}``; a 4-element list for IV alignment with known start and end, e.g. ``['TIC101', 100, 30, 50]`` (IVarID, num_samples, start_value, end_value); or a 3-element list for IV alignment with known end only, e.g. ``['TIC101', 100, 50]`` where start_value is taken from the first row of that phase. The indicator variable must be monotonically increasing or decreasing within the phase; non-monotonic samples are removed with a warning before interpolation. Returns: pd.DataFrame: Aligned batch data with each phase resampled as specified, phases concatenated in key order of ``nsamples``. """ bdata_a=[] if (bdata.columns[1]=='PHASE') or \ (bdata.columns[1]=='phase') or \ (bdata.columns[1]=='Phase'): phase=True else: phase = False if phase: aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first') unique_batches=aux[aux.columns[0]].values.tolist() for b in unique_batches: data_=bdata[bdata[bdata.columns[0]]==b] vals_rs=[] bname_rs=[] phase_rs=[] firstone=True for p in nsamples.keys(): p_data= data_[data_[data_.columns[1]]==p] samps=nsamples[p] if not(isinstance(samps,list)): #Linear alignment indx=np.arange(p_data.shape[0]) new_indx=np.linspace(0,p_data.shape[0]-1,samps) bname_rs_=[p_data[p_data.columns[0]].values[0]]*samps phase_rs_=[p_data[p_data.columns[1]].values[0]]*samps vals=p_data.values[:,2:] cols=p_data.columns[2:] vals_rs_=np.zeros((samps,vals.shape[1])) for i in np.arange(vals.shape[1]): x_=indx.astype('float') y_=vals[:,i].astype('float') x_=x_[~np.isnan(y_)] y_=y_[~np.isnan(y_)] #vals_rs_[:,i]=np.interp(new_indx,indx.astype('float'),vals[:,i].astype('float')) if len(y_)==0: vals_rs_[:,i]=np.nan else: vals_rs_[:,i]=np.interp(new_indx,x_,y_,left=np.nan,right=np.nan) elif isinstance(samps,list): #align this phase with IV if len(samps)==4: iv_id=samps[0] nsamp=samps[1] start=samps[2] end =samps[3] new_indx=np.linspace(start,end,nsamp) iv = p_data[iv_id].values.astype(float) if len(samps)==3 : iv_id = samps[0] nsamp = samps[1] end = samps[2] if (not( isinstance(end,int) or isinstance(end,float)) and (isinstance(end,dict)) and not(firstone)): #end is model to estimate the end value based on previous trajectory cols=p_data.columns[2:].tolist() fakebatch=pd.DataFrame(vals_rs,columns=cols) fakebatch.insert(0,'phase',[p]*vals_rs.shape[0]) fakebatch.insert(0,'BatchID',[b]*vals_rs.shape[0]) preds=predict(fakebatch, end) end_hat=preds['Yhat'][preds['Yhat'].columns[1]].values[0] iv = p_data[iv_id].values.astype(float) end_hat = end_hat + iv[0] new_indx=np.linspace(iv[0],end_hat,nsamp) else: iv = p_data[iv_id].values.astype(float) new_indx=np.linspace(iv[0],end,nsamp) cols=p_data.columns[2:].tolist() #indx_2_iv=cols.index(iv_id) bname_rs_=[p_data[p_data.columns[0]].values[0]]*nsamp phase_rs_=[p_data[p_data.columns[1]].values[0]]*nsamp #Check monotonicity if (np.all(np.diff(iv)<0) or np.all(np.diff(iv)>0)): indx=iv vals=p_data.values[:,2:] #replace the IV with a linear variable (time surrogate) #linear_indx=np.arange(vals.shape[0]) #vals[:,indx_2_iv]=linear_indx vals_rs_=np.zeros((nsamp,vals.shape[1])) for i in np.arange(vals.shape[1]): x_=indx.astype('float') y_=vals[:,i].astype('float') x_=x_[~np.isnan(y_)] y_=y_[~np.isnan(y_)] #vals_rs_[:,i]=np.interp(new_indx,indx.astype('float'),vals[:,i].astype('float')) #vals_rs_[:,i]=np.interp(new_indx,indx ,vals[:,i].astype('float'),left=np.nan,right=np.nan) if len(y_)==0: vals_rs_[:,i]=np.nan else: vals_rs_[:,i]=np.interp(new_indx,x_,y_,left=np.nan,right=np.nan) else: #if monotonicity fails try to remove non-monotonic vals print('Indicator variable '+iv_id+' for batch '+ b +' is not monotinic') print('this is not ideal, maybe rethink your IV') print('trying to remove non-monotonic samples') x=np.arange(0,len(iv)) m=(1/sum(x**2))*sum(x*(iv-iv[0])) #if iv[0]-iv[-1] > 0 : if m<0: expected_direction = -1 #decreasing IV else: expected_direction = 1 #increasing IV vals=p_data.values[:,2:] new_vals=vals[0,:] prev_iv=iv[0] mon_iv=[iv[0]] for i in np.arange(1,vals.shape[0]): if (np.sign(iv[i] - prev_iv) * expected_direction)==1: #monotonic new_vals=np.vstack((new_vals,vals[i,:])) prev_iv=iv[i] mon_iv.append(iv[i]) indx=np.array(mon_iv) vals=new_vals.copy() #replace the IV with a linear variable (time surrogate) #linear_indx=np.arange(vals.shape[0]) #vals[:,indx_2_iv]=linear_indx vals_rs_=np.zeros((nsamp,vals.shape[1])) for i in np.arange(vals.shape[1]): x_=indx.astype('float') y_=vals[:,i].astype('float') x_=x_[~np.isnan(y_)] y_=y_[~np.isnan(y_)] #vals_rs_[:,i]=np.interp(new_indx,indx,vals[:,i].astype('float'),left=np.nan,right=np.nan) if len(y_)==0: vals_rs_[:,i]=np.nan else: vals_rs_[:,i]=np.interp(new_indx,x_,y_,left=np.nan,right=np.nan) if firstone: vals_rs=vals_rs_ firstone=False else: vals_rs=np.vstack((vals_rs,vals_rs_)) bname_rs.extend(bname_rs_) phase_rs.extend(phase_rs_) df_=pd.DataFrame(vals_rs,columns=cols) df_.insert(0,bdata.columns[1],phase_rs) df_.insert(0,bdata.columns[0],bname_rs) bdata_a.append(df_) bdata_a=pd.concat(bdata_a) return bdata_a
[docs] def plot_var_all_batches(bdata,*,which_var=False,plot_title='',mkr_style='.-', phase_samples=False,alpha_=0.2,timecolumn=False,lot_legend=False): """Plot trajectories of one or more variables for all batches. Produces one Matplotlib figure per variable, with each batch overlaid as a separate line. Optionally adds phase boundary annotations and a batch legend. Args: bdata (pd.DataFrame): Batch data. First column is batch ID; second column may optionally be a phase label; remaining columns are process variables. Batches are stacked vertically. which_var (list[str], str, or bool): Variables to plot. If ``False`` (default), all process variables are plotted. plot_title (str): Title applied to all figures. Default ``''``. mkr_style (str): Matplotlib line/marker style string, e.g. ``'.-'`` (default), ``'o'``, ``'-'``. phase_samples (dict or bool): Phase structure used to annotate phase boundaries as vertical magenta lines. Pass the same ``nsamples`` dict used for alignment. Default ``False`` (no annotations). alpha_ (float): Transparency of phase boundary lines (0–1). Default ``0.2``. timecolumn (str or bool): If a column name is given, the x-axis uses the values in that column instead of the sample sequence index. Default ``False``. lot_legend (bool): If ``True``, adds a legend showing each batch ID. Default ``False``. Returns: None: Displays one Matplotlib figure per variable. """ var_list=which_var if (bdata.columns[1]=='PHASE') or \ (bdata.columns[1]=='phase') or \ (bdata.columns[1]=='Phase'): if isinstance(var_list,bool): var_list=bdata.columns[2:].tolist() else: if isinstance(var_list,bool): var_list=bdata.columns[1:].tolist() if isinstance(var_list,str) and not(isinstance(var_list,list)): var_list=[var_list] for v in var_list: plt.figure() if not(isinstance(timecolumn,bool)): dat=bdata[[bdata.columns[0],timecolumn,v]] else: dat=bdata[[bdata.columns[0],v]] for b in np.unique(dat[bdata.columns[0]]): data_=dat[v][dat[dat.columns[0]]==b] if not(isinstance(timecolumn,bool)): tim=dat[timecolumn][dat[dat.columns[0]]==b] if lot_legend: plt.plot(tim.values,data_.values,mkr_style,label=b) else: plt.plot(tim.values,data_.values,mkr_style) plt.xlabel(timecolumn) else: if lot_legend: plt.plot(1+np.arange(len(data_.values)),data_.values,mkr_style,label=b) else: plt.plot(1+np.arange(len(data_.values)),data_.values,mkr_style) plt.xlabel('Sample') plt.ylabel(v) if lot_legend: plt.legend(loc='center left', bbox_to_anchor=(1, 0.5)) if not(isinstance(phase_samples,bool)): s_txt=1 s_lin=1 plt.axvline(x=1,color='magenta',alpha=alpha_) for p in phase_samples.keys(): if isinstance(phase_samples[p],list): s_lin+=phase_samples[p][1] else: s_lin+=phase_samples[p] plt.axvline(x=s_lin,color='magenta',alpha=alpha_) ylim_=plt.ylim() plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta') if isinstance(phase_samples[p],list): s_txt+=phase_samples[p][1] else: s_txt+=phase_samples[p] plt.title(plot_title) plt.tight_layout()
[docs] def plot_batch(bdata,which_batch,which_var,*,include_mean_exc=False,include_set=False, phase_samples=False,single_plot=False,plot_title=''): """Plot the trajectory of one or more batches for selected variables. Highlights the specified batch(es) in black against an optional backdrop of all other batch trajectories and/or their mean. Args: bdata (pd.DataFrame): Batch data. First column is batch ID; second column may optionally be a phase label; remaining columns are process variables. Batches are stacked vertically. which_batch (str or list[str]): Batch ID(s) to highlight. which_var (str or list[str]): Variable name(s) to plot. include_mean_exc (bool): If ``True``, overlays the mean trajectory of all other batches (excluding the highlighted batch) in red. Default ``False``. include_set (bool): If ``True``, overlays all other batch trajectories in light magenta for context. Default ``False``. phase_samples (dict or bool): Phase structure for annotating phase boundaries. Pass the same ``nsamples`` dict used for alignment. Default ``False`` (no annotations). single_plot (bool): If ``True``, plots all selected variables on a single axis. If ``False`` (default), each variable gets its own figure. plot_title (str): Text appended to each figure title after the batch ID. Default ``''``. Returns: None: Displays one or more Matplotlib figures. """ if isinstance(which_batch,str) and not(isinstance(which_batch,list)): which_batch=[which_batch] if isinstance(which_var,str) and not(isinstance(which_var,list)): which_var=[which_var] if single_plot: plt.figure() first_pass=True for b in which_batch: this_batch=bdata[bdata[bdata.columns[0]]==b] all_others=bdata[bdata[bdata.columns[0]]!=b] for v in which_var: this_var_this_batch=this_batch[v].values if not(single_plot): plt.figure() x_axis=np.arange(len(this_var_this_batch ))+1 plt.plot(x_axis,this_var_this_batch,'k',label=b) if include_mean_exc or include_set: this_var_all_others=[] max_obs=0 for bb in np.unique(all_others[bdata.columns[0]]): this_var_all_others.append(all_others[v][all_others[bdata.columns[0]]==bb ].values) if len(all_others[v][all_others[bdata.columns[0]]==bb ].values ) > max_obs: max_obs = len(all_others[v][all_others[bdata.columns[0]]==bb ].values ) this_var_all_others_=[] for bb in np.unique(all_others[bdata.columns[0]]): _to_append=all_others[v][all_others[bdata.columns[0]]==bb ].values this_len=len(_to_append) _to_append=np.append(_to_append,np.tile(np.nan,max_obs-this_len)) this_var_all_others_.append(_to_append) this_var_all_others_=np.array(this_var_all_others_) x_axis=np.arange(max_obs)+1 if not(single_plot): if include_set: for t in this_var_all_others: x_axis=np.arange(len(t))+1 plt.plot(x_axis,t,'m',alpha=0.1) plt.plot(x_axis,this_var_all_others[-1],'m',alpha=0.1,label='rest of set') if include_mean_exc: plt.plot(x_axis,mean(this_var_all_others_,axis=0),'r',label='Mean without '+b ) else: if first_pass: if include_set: plt.plot(x_axis,this_var_all_others_.T,'m',alpha=0.1) plt.plot(x_axis,this_var_all_others_[0,:],'m',alpha=0.1,label='rest of set') if include_mean_exc: plt.plot(x_axis,mean(this_var_all_others_,axis=0),'r',label='Mean without '+b ) first_pass=False if not(isinstance(phase_samples,bool)): s_txt=1 s_lin=1 plt.axvline(x=1,color='magenta',alpha=0.2) for p in phase_samples.keys(): if isinstance(phase_samples[p],list): s_lin+=phase_samples[p][1] else: s_lin+=phase_samples[p] plt.axvline(x=s_lin,color='magenta',alpha=0.2) ylim_=plt.ylim() plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta') if isinstance(phase_samples[p],list): s_txt+=phase_samples[p][1] else: s_txt+=phase_samples[p] plt.title(b+' ' + plot_title) plt.xlabel('sample') plt.ylabel(v) plt.legend(bbox_to_anchor=(1.04,1), borderaxespad=0) plt.tight_layout()
[docs] def unfold_horizontal(bdata): """Unfold batch data horizontally (batch-wise unfolding). Reshapes aligned batch data from the vertical stacked format (samples × variables) into a 2-D matrix where each row is one batch and columns represent all variables at all time points: ``[Var1_t1, Var1_t2, ..., Var1_tN, Var2_t1, ...]``. This is the standard preprocessing step before fitting :func:`mpca` or :func:`mpls` with batch-wise unfolding. Args: bdata (pd.DataFrame): Aligned batch data. First column is batch ID; second column may optionally be a phase label; remaining columns are process variables. All batches must have the same number of rows. Returns: tuple: - **bdata_hor** (pd.DataFrame): Unfolded matrix, one row per batch. First column is batch ID; remaining columns are variable-time combinations. - **colnames** (list[str]): Column names of the unfolded matrix (e.g. ``['Var1_1', 'Var1_2', ..., 'VarN_T']``). - **bid** (list[str]): Variable-block identifiers — for each column in ``bdata_hor``, the name of the original process variable it belongs to. Used internally to reconstruct block structure. """ if (bdata.columns[1]=='PHASE') or \ (bdata.columns[1]=='phase') or \ (bdata.columns[1]=='Phase'): phase=True else: phase = False firstone=True clbl=[] bid=[] aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first') unique_batches=aux[aux.columns[0]].values.tolist() for b in unique_batches: data_=bdata[bdata[bdata.columns[0]]==b] if phase: vals=data_.values[:,2:] cols=data_.columns[2:] else: vals=data_.values[:,1:] cols=data_.columns[1:] row_=[] firstone_c=True for c in np.arange(vals.shape[1]): r_= vals[:,[c]].reshape(1,-1) if firstone: for i in np.arange(1,r_.shape[1]+1): clbl.append(cols[c]+'_'+str(i)) bid.append(cols[c] ) if firstone_c: row_=r_ firstone_c=False else: row_=np.hstack((row_,r_)) if firstone: bdata_hor=row_ firstone=False else: bdata_hor=np.vstack((bdata_hor,row_)) bdata_hor= pd.DataFrame(bdata_hor,columns=clbl) bdata_hor.insert(0, bdata.columns[0],unique_batches) return bdata_hor,clbl,bid
[docs] def refold_horizontal(xuf,nvars,nsamples): """Refold a horizontally unfolded batch matrix back to 3-D array form. Inverts the operation of :func:`unfold_horizontal`, converting a 2-D unfolded matrix (one row per batch) back to a 3-D arrangement (total_samples × nvars), suitable for conversion back to a DataFrame. Args: xuf (np.ndarray): Horizontally unfolded batch data, shape (n_batches × (nvars × nsamples)). Strictly numeric — no ID column. nvars (int): Number of process variables per sample. nsamples (int): Number of time samples per batch. Returns: np.ndarray: Refolded array of shape (n_batches × nsamples, nvars), where the rows for each batch are stacked vertically. """ #xuf is strictly numberical Xb=[] for i in np.arange(xuf.shape[0]): r=xuf[i,:] for v in np.arange(nvars): var=r[:nsamples] var=var.reshape(-1,1) if v<(nvars-1): r=r[nsamples:] if v==0: batch=var else: batch=np.hstack((batch,var)) if i==0: Xb=batch else: Xb=np.vstack((Xb,batch)) return Xb
def _uf_l(L,spb,vpb): first_=True for i in np.arange(L.shape[1]): col_=L[:,[i]] s_=np.arange( spb ) first_flag=True for v in np.arange(vpb): s_2use=(s_ + v*spb) if first_flag: col_rearranged=col_[s_2use] first_flag=False else: col_rearranged = np.hstack((col_rearranged,col_[s_2use])) col_rearranged = col_rearranged.reshape(1,-1) if first_: L_uf_hor_mon = col_rearranged.T first_ = False else: L_uf_hor_mon = np.hstack((L_uf_hor_mon,col_rearranged.T )) return L_uf_hor_mon def _uf_hor_mon_loadings(mvmobj): spb = mvmobj['nsamples'] vpb = mvmobj['nvars'] is_pls=False if 'Q' in mvmobj: is_pls=True ninit=mvmobj['ninit'] if is_pls: if ninit > 0: z_ws = mvmobj['Ws'][:ninit,:] z_w = mvmobj['W'][:ninit,:] z_p = mvmobj['P'][:ninit,:] z_mx = mvmobj['mx'][:ninit] z_sx = mvmobj['sx'][:ninit] x_ws = mvmobj['Ws'][ninit:,:] x_w = mvmobj['W'][ninit:,:] x_p = mvmobj['P'][ninit:,:] x_mx = mvmobj['mx'][ninit:] x_sx = mvmobj['sx'][ninit:] Ws_ufm = np.vstack(( z_ws,_uf_l(x_ws,spb,vpb))) W_ufm = np.vstack(( z_w ,_uf_l(x_w ,spb,vpb))) P_ufm = np.vstack(( z_p ,_uf_l(x_p,spb,vpb))) mx_ufm = np.vstack(( z_mx.reshape(-1,1),_uf_l( x_mx.reshape(-1,1),spb,vpb ))).reshape(-1) sx_ufm = np.vstack(( z_sx.reshape(-1,1),_uf_l( x_sx.reshape(-1,1),spb,vpb ))).reshape(-1) else: Ws_ufm = _uf_l(mvmobj['Ws'],spb,vpb) W_ufm = _uf_l(mvmobj['W'],spb,vpb) P_ufm = _uf_l(mvmobj['P'],spb,vpb) mx_ufm = _uf_l( mvmobj['mx'].reshape(-1,1),spb,vpb ).reshape(-1) sx_ufm = _uf_l( mvmobj['sx'].reshape(-1,1),spb,vpb ).reshape(-1) else: P_ufm = _uf_l(mvmobj['P'],spb,vpb) mx_ufm = _uf_l( mvmobj['mx'].reshape(-1,1),spb,vpb ).reshape(-1) sx_ufm = _uf_l( mvmobj['sx'].reshape(-1,1),spb,vpb ).reshape(-1) if is_pls: mvmobj['Ws_ufm'] = Ws_ufm mvmobj['W_ufm'] = W_ufm mvmobj['P_ufm'] = P_ufm else: mvmobj['P_ufm'] = P_ufm mvmobj['mx_ufm']=mx_ufm mvmobj['sx_ufm']=sx_ufm return mvmobj
[docs] def loadings(mmvm_obj,dim,*,r2_weighted=False,which_var=False): """Plot batch model loadings as a function of sample number. For each process variable, produces a filled-area plot of the loading (W* for PLS, P for PCA) vs. sample index, so the temporal pattern of each variable's influence on the model can be inspected visually. For PLS models with initial conditions (``ninit > 0``), an additional bar chart is produced for the initial-condition variables. Args: mmvm_obj (dict): Multi-way PCA model from :func:`mpca` or multi-way PLS model from :func:`mpls`. dim (int): Component index to plot (1-indexed). r2_weighted (bool): If ``True``, multiplies each loading by its corresponding per-variable R² value before plotting, so variables that explain more variance appear larger. Default ``False``. which_var (str, list[str], or bool): Process variable name(s) to plot. If ``False`` (default), all variables are plotted. Returns: None: Displays one Matplotlib figure per variable (plus one bar chart for initial conditions if applicable). """ dim=dim-1 if 'Q' in mmvm_obj: if mmvm_obj['ninit']==0: if r2_weighted: aux_df=pd.DataFrame(mmvm_obj['Ws']*mmvm_obj['r2xpv']) else: aux_df=pd.DataFrame(mmvm_obj['Ws']) aux_df.insert(0,'bid',mmvm_obj['bid']) if isinstance(which_var,bool): vars_to_plot=unique(aux_df,'bid') else: if isinstance(which_var,str): vars_to_plot=[which_var] if isinstance(which_var,list): vars_to_plot=which_var for i,v in enumerate(vars_to_plot): plt.figure() dat=aux_df[dim][aux_df['bid']==v].values plt.fill_between(np.arange(mmvm_obj['nsamples'])+1, dat ) plt.xlabel('sample') if r2_weighted: plt.ylabel('$W^* * R^2$ ['+str(dim+1)+']') else: plt.ylabel('$W^*$ ['+str(dim+1)+']') plt.title(v) plt.ylim(mmvm_obj['Ws'][:,dim].min()*1.2,mmvm_obj['Ws'][:,dim].max()*1.2 ) ylim_=plt.ylim() phase_samples=mmvm_obj['phase_samples'] if not(isinstance(phase_samples,bool)): s_txt=1 s_lin=1 plt.axvline(x=1,color='magenta',alpha=0.2) for p in phase_samples.keys(): if isinstance(phase_samples[p],list): s_lin+=phase_samples[p][1] else: s_lin+=phase_samples[p] plt.axvline(x=s_lin,color='magenta',alpha=0.2) # ylim_=[mmvm_obj['Ws'][dim,:].min()*1.2,mmvm_obj['Ws'][dim,:].max()*1.2 ] plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta') if isinstance(phase_samples[p],list): s_txt+=phase_samples[p][1] else: s_txt+=phase_samples[p] plt.tight_layout() else: z_loadings= mmvm_obj['Ws'] [np.arange(mmvm_obj['ninit'])] r2pvz = mmvm_obj['r2xpv'] [np.arange(mmvm_obj['ninit']),:] if r2_weighted: z_loadings = z_loadings *r2pvz zvars = mmvm_obj['varidX'][0:mmvm_obj['ninit']] plt.figure() plt.bar(zvars,z_loadings[:,dim] ) plt.xticks(rotation=90) if r2_weighted: plt.ylabel('$W^* * R^2$ ['+str(dim+1)+']') else: plt.ylabel('$W^*$ ['+str(dim+1)+']') plt.title('Loadings for Initial Conditions') plt.tight_layout() rows_=np.arange( mmvm_obj['nsamples']*mmvm_obj['nvars'])+mmvm_obj['ninit'] if r2_weighted: aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:]*mmvm_obj['r2xpv'][rows_,:] ) else: aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:] ) aux_df.insert(0,'bid',mmvm_obj['bid']) if isinstance(which_var,bool): vars_to_plot=unique(aux_df,'bid') else: if isinstance(which_var,str): vars_to_plot=[which_var] if isinstance(which_var,list): vars_to_plot=which_var for i,v in enumerate(vars_to_plot): plt.figure() dat=aux_df[dim][aux_df['bid']==v].values plt.fill_between(np.arange(mmvm_obj['nsamples'])+1, dat ) plt.xlabel('sample') if r2_weighted: plt.ylabel('$W^* * R^2$ ['+str(dim+1)+']') else: plt.ylabel('$W^*$ ['+str(dim+1)+']') plt.title(v) plt.ylim(mmvm_obj['Ws'][:,dim].min()*1.2,mmvm_obj['Ws'][:,dim].max()*1.2 ) ylim_=plt.ylim() phase_samples=mmvm_obj['phase_samples'] if not(isinstance(phase_samples,bool)): s_txt=1 s_lin=1 plt.axvline(x=1,color='magenta',alpha=0.2) for p in phase_samples.keys(): if isinstance(phase_samples[p],list): s_lin+=phase_samples[p][1] else: s_lin+=phase_samples[p] plt.axvline(x=s_lin,color='magenta',alpha=0.2) # ylim_=[mmvm_obj['Ws'][dim,:].min()*1.2,mmvm_obj['Ws'][dim,:].max()*1.2 ] plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta') if isinstance(phase_samples[p],list): s_txt+=phase_samples[p][1] else: s_txt+=phase_samples[p] plt.tight_layout() else: if r2_weighted: aux_df=pd.DataFrame(mmvm_obj['P']*mmvm_obj['r2xpv']) else: aux_df=pd.DataFrame(mmvm_obj['P']) aux_df.insert(0,'bid',mmvm_obj['bid']) if isinstance(which_var,bool): vars_to_plot=unique(aux_df,'bid') else: if isinstance(which_var,str): vars_to_plot=[which_var] if isinstance(which_var,list): vars_to_plot=which_var for i,v in enumerate(vars_to_plot): plt.figure() dat=aux_df[dim][aux_df['bid']==v].values plt.fill_between(np.arange(mmvm_obj['nsamples'])+1, dat ) plt.xlabel('sample') if r2_weighted: plt.ylabel('P * $R^2$ ['+str(dim+1)+']') else: plt.ylabel('P ['+str(dim+1)+']') plt.title(v) plt.ylim(mmvm_obj['P'][:,dim].min()*1.2,mmvm_obj['P'][:,dim].max()*1.2 ) ylim_=plt.ylim() phase_samples=mmvm_obj['phase_samples'] if not(isinstance(phase_samples,bool)): s_txt=1 s_lin=1 plt.axvline(x=1,color='magenta',alpha=0.2) for p in phase_samples.keys(): if isinstance(phase_samples[p],list): s_lin+=phase_samples[p][1] else: s_lin+=phase_samples[p] plt.axvline(x=s_lin,color='magenta',alpha=0.2) #ylim_=[mmvm_obj['P'][dim,:].min()*1.2,mmvm_obj['P'][dim,:].max()*1.2 ] plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta') if isinstance(phase_samples[p],list): s_txt+=phase_samples[p][1] else: s_txt+=phase_samples[p] plt.tight_layout()
[docs] def loadings_abs_integral(mmvm_obj,*,r2_weighted=False,addtitle=False): """Plot the integral of absolute loadings per variable across all LVs/PCs. For each latent variable / principal component, produces a bar chart where each bar represents the sum of absolute loading values over all time samples for that variable. This gives a scalar importance measure for each process variable per component. Args: mmvm_obj (dict): Multi-way PCA model from :func:`mpca` or multi-way PLS model from :func:`mpls`. r2_weighted (bool): If ``True``, weights each loading by its per-variable R² before summing, emphasising variables that also explain more variance. Default ``False``. addtitle (str or bool): Optional string to use as the figure title. If ``False`` (default), no title is added. Returns: None: Displays one Matplotlib figure per latent variable / PC. """ if 'Q' in mmvm_obj: if mmvm_obj['ninit']==0: if r2_weighted: aux_df=pd.DataFrame(mmvm_obj['Ws']*mmvm_obj['r2xpv']) else: aux_df=pd.DataFrame(mmvm_obj['Ws']) aux_df.insert(0,'bid',mmvm_obj['bid']) else: rows_=np.arange( mmvm_obj['nsamples']*mmvm_obj['nvars'])+mmvm_obj['ninit'] if r2_weighted: aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:]*mmvm_obj['r2xpv'][rows_,:] ) else: aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:] ) aux_df.insert(0,'bid',mmvm_obj['bid']) integral_of_loadings=[] aux_vname=[] for i,v in enumerate(unique(aux_df,'bid')): dat=aux_df[aux_df['bid']==v].values[:,1:] integral_of_loadings.append(np.sum(np.abs(dat),axis=0,keepdims=True)[0]) aux_vname.append(v) integral_of_loadings=np.array(integral_of_loadings) for a in np.arange(mmvm_obj['T'].shape[1]): plt.figure() plt.bar(aux_vname,integral_of_loadings[:,a]) if r2_weighted: plt.ylabel(r'$\sum (|W^*| * R^2$ ['+str(a+1)+'])') else: plt.ylabel(r'$\sum (|W^*|$ ['+str(a+1)+'])') plt.xticks(rotation=75) if not(isinstance(addtitle,bool)) and isinstance(addtitle,str): plt.title(addtitle) plt.tight_layout() else: if r2_weighted: aux_df=pd.DataFrame(mmvm_obj['P']*mmvm_obj['r2xpv']) else: aux_df=pd.DataFrame(mmvm_obj['P']) aux_df.insert(0,'bid',mmvm_obj['bid']) integral_of_loadings=[] aux_vname=[] for i,v in enumerate(unique(aux_df,'bid')): dat=aux_df[aux_df['bid']==v].values[:,1:] integral_of_loadings.append(np.sum(np.abs(dat),axis=0,keepdims=True)[0]) aux_vname.append(v) integral_of_loadings=np.array(integral_of_loadings) for a in np.arange(mmvm_obj['T'].shape[1]): plt.figure() plt.bar(aux_vname,integral_of_loadings[:,a]) if r2_weighted: plt.ylabel(r'$\sum (|P| * R^2$ ['+str(a+1)+'])') else: plt.ylabel(r'$\sum$ (|P| ['+str(a+1)+'])') plt.xticks(rotation=75) if not(isinstance(addtitle,bool)) and isinstance(addtitle,str): plt.title(addtitle) plt.tight_layout()
[docs] def batch_vip(mmvm_obj,*,addtitle=False): """Plot a batch-level VIP score summarising variable importance across all LVs. Computes a scalar VIP-like score for each process variable by summing the absolute loadings weighted by R²Y across all latent variables, then summing over all time samples. The result is shown as a bar chart, sorted by variable (not by VIP magnitude). This is conceptually analogous to the standard VIP but adapted for the temporal, unfolded batch model structure. Args: mmvm_obj (dict): Multi-way PLS model from :func:`mpls`. For PCA models the plot uses R²X weighting instead of R²Y. addtitle (str or bool): Optional string to use as the figure title. If ``False`` (default), no title is added. Returns: None: Displays a single Matplotlib bar chart. """ r2_weighted=True if 'Q' in mmvm_obj: if mmvm_obj['ninit']==0: if r2_weighted: aux_df=pd.DataFrame(mmvm_obj['Ws']*np.tile(mmvm_obj['r2y'],(mmvm_obj['Ws'].shape[0],1))) else: aux_df=pd.DataFrame(mmvm_obj['Ws']) aux_df.insert(0,'bid',mmvm_obj['bid']) else: rows_=np.arange( mmvm_obj['nsamples']*mmvm_obj['nvars'])+mmvm_obj['ninit'] if r2_weighted: aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:]*mmvm_obj['r2xpv'][rows_,:] ) else: aux_df=pd.DataFrame(mmvm_obj['Ws'][rows_,:] ) aux_df.insert(0,'bid',mmvm_obj['bid']) integral_of_loadings=[] aux_vname=[] for i,v in enumerate(unique(aux_df,'bid')): dat=aux_df[aux_df['bid']==v].values[:,1:] integral_of_loadings.append(np.sum(np.abs(dat),axis=0,keepdims=True)[0]) aux_vname.append(v) integral_of_loadings=np.array(integral_of_loadings) plt.figure() plt.bar(aux_vname,np.sum(integral_of_loadings,axis=1)) plt.ylabel('Batch VIP') plt.xticks(rotation=75) if not(isinstance(addtitle,bool)) and isinstance(addtitle,str): plt.title(addtitle) plt.tight_layout() else: if r2_weighted: aux_df=pd.DataFrame(mmvm_obj['P']*mmvm_obj['r2xpv']) else: aux_df=pd.DataFrame(mmvm_obj['P']) aux_df.insert(0,'bid',mmvm_obj['bid']) integral_of_loadings=[] aux_vname=[] for i,v in enumerate(unique(aux_df,'bid')): dat=aux_df[aux_df['bid']==v].values[:,1:] integral_of_loadings.append(np.sum(np.abs(dat),axis=0,keepdims=True)[0]) aux_vname.append(v) integral_of_loadings=np.array(integral_of_loadings) plt.figure() plt.bar(aux_vname,np.sum(integral_of_loadings,axis=1)) plt.ylabel(r'$\sum_{a} \sum (|P| * R^2$') plt.xticks(rotation=75) if not(isinstance(addtitle,bool)) and isinstance(addtitle,str): plt.title(addtitle) plt.tight_layout()
[docs] def r2pv(mmvm_obj,*,which_var=False): """Plot cumulative R² per variable as a function of sample number. For each process variable, produces a stacked filled-area plot where each band represents the cumulative R² contribution of one LV/PC at each time sample. For PLS models, a separate stacked bar chart is also produced for the Y-space R²pvY. For models with initial conditions (``ninit > 0``), an additional bar chart is produced for the initial-condition variable R² values. Args: mmvm_obj (dict): Multi-way PCA model from :func:`mpca` or multi-way PLS model from :func:`mpls`. which_var (str, list[str], or bool): Process variable name(s) to plot. If ``False`` (default), all variables are plotted. Returns: None: Displays one Matplotlib figure per variable, plus additional figures for Y-space and initial conditions where applicable. """ if mmvm_obj['ninit']==0: aux_df=pd.DataFrame(mmvm_obj['r2xpv']) aux_df.insert(0,'bid',mmvm_obj['bid']) if isinstance(which_var,bool): vars_to_plot=unique(aux_df,'bid') else: if isinstance(which_var,str): vars_to_plot=[which_var] if isinstance(which_var,list): vars_to_plot=which_var for i,v in enumerate(vars_to_plot): dat=aux_df[aux_df['bid']==v].values*100 dat=dat[:,1:].astype(float) dat=np.cumsum(dat,axis=1) dat=np.hstack((np.zeros((dat.shape[0],1)) ,dat)) plt.figure() for a in np.arange(mmvm_obj['A'])+1: plt.fill_between(np.arange(mmvm_obj['nsamples'])+1, dat[:,a],dat[:,a-1],label='LV #'+str(a) ) plt.xlabel('sample') plt.ylabel('$R^2$pvX (%)') plt.legend() plt.title(v) plt.ylim(0,100) ylim_=plt.ylim() phase_samples=mmvm_obj['phase_samples'] if not(isinstance(phase_samples,bool)): s_txt=1 s_lin=1 plt.axvline(x=1,color='black',alpha=0.2) for p in phase_samples.keys(): if isinstance(phase_samples[p],list): s_lin+=phase_samples[p][1] else: s_lin+=phase_samples[p] plt.axvline(x=s_lin,color='black',alpha=0.2) plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='black') if isinstance(phase_samples[p],list): s_txt+=phase_samples[p][1] else: s_txt+=phase_samples[p] plt.tight_layout() if 'Q' in mmvm_obj: r2pvy = mmvm_obj['r2ypv']*100 #yvars=mmvm_obj['varidY'] lbls=[] for a in np.arange(1,mmvm_obj['A']+1): lbls.append('LV #'+str(a)) r2pvy_pd=pd.DataFrame(r2pvy,index=mmvm_obj['varidY'],columns=lbls) fig1,ax1=plt.subplots() r2pvy_pd.plot(kind='bar', stacked=True,ax=ax1) #ax.set_xticks(rotation=90) ax1.set_ylabel('$R^2$pvY') ax1.set_title('$R^2$ per LV for Y-Space') fig1.tight_layout() else: r2pvz = mmvm_obj['r2xpv'] [np.arange(mmvm_obj['ninit']),:]*100 zvars = mmvm_obj['varidX'][0:mmvm_obj['ninit']] lbls=[] for a in np.arange(1,mmvm_obj['A']+1): lbls.append('LV #'+str(a)) r2pvz_pd=pd.DataFrame(r2pvz,index=zvars,columns=lbls) fig2,ax2=plt.subplots() r2pvz_pd.plot(kind='bar', stacked=True,ax=ax2) ax2.set_ylabel('$R^2$pvZ') ax2.set_title('$R^2$ Initial Conditions') fig2.tight_layout() rows_=np.arange( mmvm_obj['nsamples']*mmvm_obj['nvars'])+mmvm_obj['ninit'] aux_df=pd.DataFrame(mmvm_obj['r2xpv'][rows_,:] ) aux_df.insert(0,'bid',mmvm_obj['bid']) if isinstance(which_var,bool): vars_to_plot=unique(aux_df,'bid') else: if isinstance(which_var,str): vars_to_plot=[which_var] if isinstance(which_var,list): vars_to_plot=which_var for i,v in enumerate(vars_to_plot): dat=aux_df[aux_df['bid']==v].values*100 dat=dat[:,1:].astype(float) dat=np.cumsum(dat,axis=1) dat=np.hstack((np.zeros((dat.shape[0],1)) ,dat)) plt.figure() for a in np.arange(mmvm_obj['A'])+1: plt.fill_between(np.arange(mmvm_obj['nsamples'])+1, dat[:,a],dat[:,a-1],label='LV #'+str(a) ) plt.xlabel('sample') plt.ylabel('$R^2$pvX (%)') plt.legend() plt.title(v) plt.ylim(0,100) ylim_=plt.ylim() phase_samples=mmvm_obj['phase_samples'] if not(isinstance(phase_samples,bool)): s_txt=1 s_lin=1 plt.axvline(x=1,color='black',alpha=0.2) for p in phase_samples.keys(): if isinstance(phase_samples[p],list): s_lin+=phase_samples[p][1] else: s_lin+=phase_samples[p] plt.axvline(x=s_lin,color='black',alpha=0.2) plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='black') if isinstance(phase_samples[p],list): s_txt+=phase_samples[p][1] else: s_txt+=phase_samples[p] plt.tight_layout() if 'Q' in mmvm_obj: r2pvy = mmvm_obj['r2ypv']*100 #yvars=mmvm_obj['varidY'] lbls=[] for a in np.arange(1,mmvm_obj['A']+1): lbls.append('LV #'+str(a)) r2pvy_pd=pd.DataFrame(r2pvy,index=mmvm_obj['varidY'],columns=lbls) fig1,ax1=plt.subplots() r2pvy_pd.plot(kind='bar', stacked=True,ax=ax1) ax1.set_ylabel('$R^2$pvY') ax1.set_title('$R^2$ per LV for Y-Space') fig1.tight_layout()
[docs] def mpca(xbatch,a,*,unfolding='batch wise',phase_samples=False,cross_val=0): """Fit a Multi-way PCA (MPCA) model to aligned batch data. Unfolds the batch data into a 2-D matrix (batch-wise or variable-wise) and fits a PCA model. Low-variance columns are removed automatically and their positions are restored in the model loadings for consistent interpretation. Args: xbatch (pd.DataFrame): Aligned batch data, all batches having the same number of samples. First column is batch ID; second column may optionally be a phase label; remaining columns are process variables. Batches are stacked vertically. a (int): Number of principal components to fit. unfolding (str): Unfolding strategy. ``'batch wise'`` (default) unfolds to one row per batch; ``'variable wise'`` keeps the observation-per-sample structure. phase_samples (dict or bool): Phase structure stored in the model for use in plotting functions. Pass the same ``nsamples`` dict used for alignment. Default ``False``. cross_val (int): Cross-validation percentage of elements to remove per round. ``0`` (default) = no CV; ``100`` = leave-one-out. Returns: dict: Multi-way PCA model object extending the standard PCA dict from :func:`pyphi.calc.pca` with additional batch-specific keys: - ``'varidX'`` (list[str]): Variable column names in unfolded order. - ``'bid'`` (list[str]): Block ID for each column (original variable name). - ``'uf'`` (str): Unfolding strategy used (``'batch wise'``). - ``'phase_samples'``: Phase structure passed in (for plotting). - ``'nvars'`` (int): Number of process variables per sample. - ``'nbatches'`` (int): Number of batches in the training set. - ``'nsamples'`` (int): Number of samples per batch. - ``'ninit'`` (int): Number of initial-condition variables (always ``0`` for MPCA). - ``'A'`` (int): Number of principal components fitted. """ if (xbatch.columns[1]=='PHASE') or \ (xbatch.columns[1]=='phase') or \ (xbatch.columns[1]=='Phase'): nvars = xbatch.shape[1]-2 else: nvars = xbatch.shape[1]-1 nbatches = len(np.unique(xbatch[xbatch.columns[0]])) nsamples = xbatch.shape[0]/nbatches if unfolding=='batch wise': # remove low variance columns keeping record of the original order x_uf_,colnames,bid_o = unfold_horizontal(xbatch) # colnames is original set of columns x_uf,colsrem = phi.clean_low_variances(x_uf_,shush=True) # colsrem are columns removed mx_rem=x_uf_[colsrem].mean().tolist() mx_rem=np.array(mx_rem) mpca_obj=phi.pca(x_uf,a,cross_val=cross_val) if len(colsrem)>0: xtra_col = np.zeros((2+2*a,1 )) xtra_col[0] = 1 xtra_cols = np.tile(xtra_col,(1,len(colsrem))) xtra_cols[1,:] = mx_rem aux = np.vstack((mpca_obj['sx'],mpca_obj['mx'],mpca_obj['P'].T,mpca_obj['r2xpv'].T)) aux = np.hstack((aux,xtra_cols)) all_cols = x_uf.columns[1:].tolist() all_cols.extend(colsrem) aux_pd = pd.DataFrame(aux,columns=all_cols) aux_pd = aux_pd[colnames] aux_new = aux_pd.values sx_ = aux_new[0,:].reshape(1,-1) mpca_obj['sx'] = sx_ mx_ = aux_new[1,:].reshape(1,-1) mpca_obj['mx'] = mx_ aux_new = aux_new[2:,:] p_ = aux_new[0:a,:] mpca_obj['P'] = p_.T r2xpv_ = aux_new[a:,:] mpca_obj['r2xpv'] = r2xpv_.T mpca_obj['varidX']= colnames mpca_obj['bid'] = bid_o mpca_obj['uf'] ='batch wise' mpca_obj['phase_samples'] = phase_samples mpca_obj['nvars'] = int(nvars) mpca_obj['nbatches'] = int(nbatches) mpca_obj['nsamples'] = int(nsamples) mpca_obj['ninit'] = 0 mpca_obj['A'] = a elif unfolding=='variable wise': if (xbatch.columns[1]=='PHASE') or \ (xbatch.columns[1]=='phase') or \ (xbatch.columns[1]=='Phase'): xbatch_=xbatch.copy() xbatch_.drop(xbatch.columns[1],axis=1,inplace=True) else: xbatch_=xbatch.copy() xbatch_,colsrem = phi.clean_low_variances(xbatch_) # colsrem are columns removed xbatch_,rowsrem = phi.clean_empty_rows(xbatch_) mpca_obj=phi.pca(xbatch_,a) mpca_obj['uf'] ='variable wise' mpca_obj['phase_samples'] = phase_samples mpca_obj['nvars'] = nvars mpca_obj['nbatches'] = nbatches mpca_obj['nsamples'] = nsamples mpca_obj['ninit'] = 0 mpca_obj['A'] = a else: mpca_obj=[] return mpca_obj
def _mimic_monitoring(mmvm_obj_f,bdata,which_batch,*,zinit=False,shush=False,soft_sensor=False): if (not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']==0) | \ ((isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0): print ('Model and data do not correspond') else: T_mon = [] ht2_mon = [] spe_mon = [] spei_mon = [] cont_spei = [] cont_spe = [] cont_ht2 = [] forecast = [] if 'Q' in mmvm_obj_f: forecast_y=[] this_batch=bdata[bdata[bdata.columns[0]]==which_batch] if (bdata.columns[1]=='PHASE') or \ (bdata.columns[1]=='phase') or \ (bdata.columns[1]=='Phase'): colnames=this_batch.columns[2:].tolist() else: colnames=this_batch.columns[1:].tolist() ss_indx=[] #Make soft_sensor var all missing data if not(isinstance(soft_sensor,bool)): if isinstance(soft_sensor,list): for v in soft_sensor: if not(v in colnames): print('Soft sensor '+v+' is not a variable in this dataset') else: ss_indx.append(colnames.index(v)) elif isinstance(soft_sensor,str): if not(soft_sensor in colnames): print('Soft sensor '+soft_sensor+' is not a variable in this dataset') else: ss_indx=[colnames.index(soft_sensor)] if not(isinstance(zinit,bool)): this_z = zinit[zinit[zinit.columns[0]]==which_batch] vals_z = this_z.values[:,1:] cols_z = this_z.columns[1:] vals_z = np.array(vals_z,dtype=np.float64) vals_z = vals_z.reshape(-1) if (bdata.columns[1]=='PHASE') or \ (bdata.columns[1]=='phase') or \ (bdata.columns[1]=='Phase'): vals=this_batch.values[:,2:] else: vals=this_batch.values[:,1:] vals=np.array(vals,dtype=np.float64) if len(ss_indx)>0: vals[:,ss_indx]=np.nan if not(shush): print('Running batch: '+which_batch) if 'Q' in mmvm_obj_f: forecast_y=[] forecast_y_=[] for k in np.arange(mmvm_obj_f['nsamples']): x_uf_k = vals[:k+1,:].reshape(1,-1) x_uf_k = x_uf_k[0].tolist() num_nans = mmvm_obj_f['nvars']*mmvm_obj_f['nsamples']-len(x_uf_k) x_uf_k.extend([np.nan]*num_nans ) x_uf_k = np.array(x_uf_k) if 'Q' in mmvm_obj_f: if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0: x_2_pred = np.hstack((vals_z,x_uf_k)) preds = phi.pls_pred(x_2_pred, mmvm_obj_f) elif (isinstance(zinit,bool)) and mmvm_obj_f['ninit']==0: preds = phi.pls_pred(x_uf_k, mmvm_obj_f) else: print('Model and data do not correspond') preds = False else: preds = phi.pca_pred(x_uf_k, mmvm_obj_f) T_mon.append(preds['Tnew'][0]) ht2_mon.append(preds['T2'][0]) #spe_mon.append(preds['speX'][0]) if 'Q' in mmvm_obj_f: forecast_y_.append(preds['Yhat'][0].reshape(-1)) if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0: ninit=mmvm_obj_f['ninit'] preds_x = preds['Xhat'][0,ninit:] preds_z = preds['Xhat'][0,:ninit] aux_df=pd.DataFrame(preds_x.reshape(mmvm_obj_f['nsamples'],-1),columns=colnames) forecast.append(aux_df) inst_preds = preds_x.reshape(-1) inst_preds = (inst_preds - mmvm_obj_f['mx'][ninit:] )/mmvm_obj_f['sx'][ninit:] #mean center current sample and current prediction for instantaneous SPE x_uf_k = (x_uf_k - mmvm_obj_f['mx'][ninit:] )/mmvm_obj_f['sx'][ninit:] else: aux_df=pd.DataFrame(preds['Xhat'].reshape(mmvm_obj_f['nsamples'],-1),columns=colnames) forecast.append(aux_df) inst_preds = preds['Xhat'].reshape(-1) inst_preds = (inst_preds - mmvm_obj_f['mx'])/mmvm_obj_f['sx'] #mean center current sample and current prediction for instantaneous SPE x_uf_k = (x_uf_k - mmvm_obj_f['mx'])/mmvm_obj_f['sx'] else: aux_df=pd.DataFrame(preds['Xhat'].reshape(mmvm_obj_f['nsamples'],-1),columns=colnames) forecast.append(aux_df) inst_preds = preds['Xhat'].reshape(-1) inst_preds = (inst_preds - mmvm_obj_f['mx'])/mmvm_obj_f['sx'] #mean center current sample and current prediction for instantaneous SPE x_uf_k = (x_uf_k - mmvm_obj_f['mx'])/mmvm_obj_f['sx'] inst_preds[np.isnan(x_uf_k)] = 0 x_uf_k[np.isnan(x_uf_k)] = 0 if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0: cont_ht2_=np.zeros(len(x_2_pred)) else: cont_ht2_=np.zeros(len(x_uf_k)) var_t=np.var(mmvm_obj_f['T'],ddof=1,axis=0) for a in np.arange(mmvm_obj_f['A']): if 'Q' in mmvm_obj_f: if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0: x_2_pred_= (x_2_pred- mmvm_obj_f['mx'])/mmvm_obj_f['sx'] cont_ht2_+= (x_2_pred_ * mmvm_obj_f['Ws'][:,a])**2 / var_t[a] else: cont_ht2_+= (x_uf_k * mmvm_obj_f['Ws'][:,a])**2 / var_t[a] else: cont_ht2_+= (x_uf_k * mmvm_obj_f['P'][:,a])**2 / var_t[a] if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0: conts_ht2_z = cont_ht2_[:ninit] aux_df=pd.DataFrame(cont_ht2_[ninit:].reshape(mmvm_obj_f['nsamples'],-1),columns=colnames) cont_ht2.append(aux_df) else: aux_df=pd.DataFrame(cont_ht2_.reshape(mmvm_obj_f['nsamples'],-1),columns=colnames) cont_ht2.append(aux_df) spe_ = (inst_preds - x_uf_k)**2 aux_df = pd.DataFrame(spe_.reshape(mmvm_obj_f['nsamples'],-1),columns=colnames) cont_spe.append(aux_df) spe_mon.append(np.sum(spe_)) inst_samp = x_uf_k[k*mmvm_obj_f['nvars']:(k+1)*mmvm_obj_f['nvars']] inst_preds = inst_preds[k*mmvm_obj_f['nvars']:(k+1)*mmvm_obj_f['nvars']] spei_ = (inst_preds - inst_samp)**2 cont_spei.append(spei_) spei_mon.append(np.sum(spei_) ) diags={'Batch':which_batch,'t_mon':np.array(T_mon),'HT2_mon':np.array(ht2_mon), 'spe_mon':np.array(spe_mon).reshape(-1),'cont_spe':cont_spe, 'spei_mon':np.array(spei_mon),'cont_spei':pd.DataFrame(np.array(cont_spei),columns=colnames), 'cont_ht2':cont_ht2,'forecast':forecast} if 'Q' in mmvm_obj_f: forecast_y=pd.DataFrame(np.array(forecast_y_),columns=mmvm_obj_f['varidY']) diags['forecast y']=forecast_y if not(isinstance(zinit,bool)) and mmvm_obj_f['ninit']>0: vals_z = (vals_z - mmvm_obj_f['mx'][:ninit] )/mmvm_obj_f['sx'][:ninit] preds_z_ = (preds_z - mmvm_obj_f['mx'][:ninit] )/mmvm_obj_f['sx'][:ninit] spe_z = (vals_z - preds_z_) cont_spe_z = spe_z**2 spe_z = np.sum(cont_spe_z) cont_spe_z = pd.DataFrame(cont_spe_z,index=cols_z,columns=['Vars']) conts_ht2_z =pd.DataFrame(conts_ht2_z,index=cols_z,columns=['Vars']) diags['reconstructed z'] = preds_z diags['spe z'] = spe_z diags['cont_spe_z'] = cont_spe_z diags['cont_ht2_z'] = conts_ht2_z return diags
[docs] def monitor(mmvm_obj,bdata,*,which_batch=False,zinit=False,build_ci=True,shush=False,soft_sensor=False): """Mimic real-time batch monitoring and produce dynamic diagnostic plots. Two-stage workflow: **Stage 1 — Build confidence intervals** (call once after fitting):: monitor(mmvm_obj, training_data) Simulates monitoring for every batch in ``bdata``, computes per-sample confidence intervals for scores, HT², global SPE, and instantaneous SPE, and writes them back into ``mmvm_obj`` in place. **Stage 2 — Monitor a new batch** (call after Stage 1):: diags = monitor(mmvm_obj, bdata, which_batch='Batch01') Simulates real-time monitoring for the specified batch, plots dynamic score, HT², SPE, and (for PLS models) Y-forecast trajectories with the Stage 1 confidence interval overlays. Args: mmvm_obj (dict): Multi-way PCA or PLS model from :func:`mpca` or :func:`mpls`. Confidence interval keys are added in Stage 1. bdata (pd.DataFrame): Batch data (aligned, same structure as training data). Used to look up batch trajectories. which_batch (str, list[str], or bool): Batch ID(s) to monitor. If ``False`` (default), Stage 1 is performed (CI building). zinit (pd.DataFrame or bool): Initial-condition data for the batch(es) being monitored. Required if the model was fitted with ``zinit``. Default ``False``. build_ci (bool): If ``True`` (default) and ``which_batch=False``, builds and stores confidence intervals in ``mmvm_obj``. shush (bool): If ``True``, suppresses progress messages. Default ``False``. soft_sensor (str, list[str], or bool): Variable name(s) to treat as soft-sensor targets — their measurements are set to NaN before prediction so that only model-based estimates are produced. Default ``False``. Returns: dict or str: In Stage 2, returns a ``diags`` dictionary (or a list of dicts if multiple batches are requested) with keys: - ``'Batch'`` (str): Batch ID. - ``'t_mon'`` (ndarray): Score trajectories (nsamples × A). - ``'HT2_mon'`` (ndarray): Hotelling's T² trajectory. - ``'spe_mon'`` (ndarray): Global SPE trajectory. - ``'spei_mon'`` (ndarray): Instantaneous SPE trajectory. - ``'cont_spe'`` (list[pd.DataFrame]): SPE contributions per sample. - ``'cont_spei'`` (pd.DataFrame): Instantaneous SPE contributions. - ``'cont_ht2'`` (list[pd.DataFrame]): HT² contributions per sample. - ``'forecast'`` (list[pd.DataFrame]): X-space forecast per sample. - ``'forecast y'`` (pd.DataFrame): Y forecast trajectory (PLS models only). - ``'spe z'``, ``'cont_spe_z'``, ``'cont_ht2_z'``, ``'reconstructed z'``: Initial-condition diagnostics (if ``zinit`` was provided). Returns ``'error batch not found'`` if the requested batch is not in ``bdata``. In Stage 1, returns ``None`` (results written to ``mmvm_obj``). """ mmvm_obj_f = mmvm_obj.copy() mmvm_obj_f = _uf_hor_mon_loadings(mmvm_obj_f) mmvm_obj_f['mx'] = mmvm_obj_f['mx_ufm'] mmvm_obj_f['sx'] = mmvm_obj_f['sx_ufm'] mmvm_obj_f['P'] = mmvm_obj_f['P_ufm'] aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first') unique_batches=aux[aux.columns[0]].values.tolist() if 'Q' in mmvm_obj: mmvm_obj_f['W'] = mmvm_obj_f['W_ufm'] mmvm_obj_f['Ws'] = mmvm_obj_f['Ws_ufm'] if isinstance(which_batch,bool) and build_ci: if mmvm_obj['nbatches']==len(np.unique(bdata[bdata.columns[0]]) ): SPE = [] SPEi = [] T = np.zeros((mmvm_obj['nsamples'],mmvm_obj['A'],mmvm_obj['nbatches'])) #build confidence intervals and update model if not(shush): print('Building real_time confidence intervals') for i,b in enumerate(unique_batches): diags = _mimic_monitoring(mmvm_obj_f,bdata,b,zinit=zinit,soft_sensor=soft_sensor) SPE.append(diags['spe_mon']) SPEi.append(diags['spei_mon']) T[:,:,i]=diags['t_mon'] #calculate conf int for scores t_mon_ci_95 = [] t_mon_ci_99 = [] for a in np.arange(mmvm_obj['A']): t_rt_ci_95 = [] t_rt_ci_99 = [] t = T[:,a,:].T for j in np.arange(mmvm_obj_f['nsamples']): l95,l99 = phi.single_score_conf_int(t[:,[j]]) t_rt_ci_95.append(l95) t_rt_ci_99.append(l99) t_mon_ci_95.append(np.array(t_rt_ci_95)) t_mon_ci_99.append(np.array(t_rt_ci_99)) #calculate conf. int for spe and HT2 #HT2 limits n = mmvm_obj['nbatches'] A= mmvm_obj['A'] ht2_mon_ci_99 = (((n-1)*(n+1)*A)/(n*(n-A)))*phi.f99(A,(n-A)) ht2_mon_ci_95 = (((n-1)*(n+1)*A)/(n*(n-A)))*phi.f95(A,(n-A)) spe_mon_ci_95 = [] spe_mon_ci_99 = [] spei_mon_ci_95 = [] spei_mon_ci_99 = [] SPE = np.array(SPE) SPEi = np.array(SPEi) for j in np.arange(mmvm_obj_f['nsamples']): l95,l99=phi.spe_ci(SPE[:,[j]]) spe_mon_ci_95.append(l95) spe_mon_ci_99.append(l99) l95,l99=phi.spe_ci(SPEi[:,[j]]) spei_mon_ci_95.append(l95) spei_mon_ci_99.append(l99) mmvm_obj['t_mon_ci_95'] = t_mon_ci_95 mmvm_obj['t_mon_ci_99'] = t_mon_ci_99 mmvm_obj['ht2_mon_ci_99'] = ht2_mon_ci_99 mmvm_obj['ht2_mon_ci_95'] = ht2_mon_ci_95 mmvm_obj['spe_mon_ci_95'] = spe_mon_ci_95 mmvm_obj['spe_mon_ci_99'] = spe_mon_ci_99 mmvm_obj['spei_mon_ci_95'] = spei_mon_ci_95 mmvm_obj['spei_mon_ci_99'] = spei_mon_ci_99 if not(shush): print('Done') #return mmvm_obj else: print("This ain't the data this model was trained on") else: if not('t_mon_ci_95' in mmvm_obj): print('No monitoring conf. int. have been calculated') print('for this model object please run: "monitor(model_obj,training_data)" ') has_ci=False else: has_ci=True if isinstance(which_batch,str): which_batch=[which_batch] diags=[] allok=True for b in which_batch: if b in bdata[bdata.columns[0]].values.tolist(): #run monitoring on a batch diags_ =_mimic_monitoring(mmvm_obj_f,bdata,b,zinit=zinit,soft_sensor=soft_sensor) diags.append(diags_) else: print('Batch not found in data set') allok=False if allok: #plot scores for a in np.arange(mmvm_obj['A']): plt.figure() for i,b in enumerate(which_batch): x_axis=np.arange(len(diags[i]['t_mon'][:,[a]] ) )+1 plt.plot(x_axis,diags[i]['t_mon'][:,[a]],'o',label=b) if has_ci: plt.plot(x_axis,mmvm_obj['t_mon_ci_95'][a],'y',alpha=0.3) plt.plot(x_axis,-mmvm_obj['t_mon_ci_95'][a],'y',alpha=0.3) plt.plot(x_axis,mmvm_obj['t_mon_ci_99'][a],'r',alpha=0.3) plt.plot(x_axis,-mmvm_obj['t_mon_ci_99'][a],'r',alpha=0.3) plt.xlabel('sample') plt.ylabel('$t_'+str(a+1)+'$') plt.title('Real time monitoring: Score plot $t_'+str(a+1)+'$') box = plt.gca().get_position() plt.gca().set_position([box.x0, box.y0, box.width * 0.8, box.height]) plt.legend(loc='center left', bbox_to_anchor=(1, 0.5)) #plot ht2 plt.figure() for i,b in enumerate(which_batch): plt.plot(x_axis,diags[i]['HT2_mon'],'o',label=b) xlim_=plt.xlim() if has_ci: plt.plot([0,xlim_[1]],[mmvm_obj['ht2_mon_ci_95'],mmvm_obj['ht2_mon_ci_95']],'y',alpha=0.3) plt.plot([0,xlim_[1]],[mmvm_obj['ht2_mon_ci_99'],mmvm_obj['ht2_mon_ci_99']],'r',alpha=0.3) plt.xlabel('sample') plt.ylabel("Hotelling's $T^2$") plt.title("Real time monitoring: Hotelling's $T^2$") box = plt.gca().get_position() plt.gca().set_position([box.x0, box.y0, box.width * 0.8, box.height]) plt.legend(loc='center left', bbox_to_anchor=(1, 0.5)) #plot spe plt.figure() for i,b in enumerate(which_batch): plt.plot(x_axis,diags[i]['spe_mon'],'o',label=b) xlim_=plt.xlim() if has_ci: plt.plot(x_axis,mmvm_obj['spe_mon_ci_95'],'y',alpha=0.3) plt.plot(x_axis,mmvm_obj['spe_mon_ci_99'],'r',alpha=0.3) plt.xlabel('sample') plt.ylabel("Global SPE") plt.title("Real time monitoring: Global SPE") box = plt.gca().get_position() plt.gca().set_position([box.x0, box.y0, box.width * 0.8, box.height]) plt.legend(loc='center left', bbox_to_anchor=(1, 0.5)) #plot spei plt.figure() for i,b in enumerate(which_batch): plt.plot(x_axis,diags[i]['spei_mon'],'o',label=b) xlim_=plt.xlim() if has_ci: plt.plot(x_axis,mmvm_obj['spei_mon_ci_95'],'y',alpha=0.3) plt.plot(x_axis,mmvm_obj['spei_mon_ci_99'],'r',alpha=0.3) plt.xlabel('sample') plt.ylabel("Instantaneous SPE") plt.title("Real time monitoring: Instantaneous SPE") box = plt.gca().get_position() plt.gca().set_position([box.x0, box.y0, box.width * 0.8, box.height]) plt.legend(loc='center left', bbox_to_anchor=(1, 0.5)) if 'Q' in mmvm_obj: for v in diags[0]['forecast y'].columns: plt.figure() for i,b in enumerate(which_batch): plt.plot(x_axis,diags[i]['forecast y'][v],'o',label=b) plt.xlabel('sample') plt.ylabel(v) plt.title('Dynamic forecast of Y') if len(diags)==1: diags=diags[0] return diags else: return 'error batch not found'
[docs] def mpls(xbatch,y,a,*,zinit=False,phase_samples=False,mb_each_var=False,cross_val=0,cross_val_X=False): """Fit a Multi-way PLS (MPLS) model to aligned batch data. Unfolds the batch data batch-wise, optionally prepends initial-condition variables, and fits a PLS (or Multi-Block PLS) model to predict ``y``. Low-variance columns are removed and their positions are restored for consistent interpretation. Args: xbatch (pd.DataFrame): Aligned batch data, all batches having the same number of samples. First column is batch ID; second column may optionally be a phase label; remaining columns are process variables. Batches are stacked vertically. y (pd.DataFrame or np.ndarray): Response matrix, one row per batch. If a DataFrame, the first column is the batch ID. a (int): Number of latent variables. zinit (pd.DataFrame or bool): Initial-condition variables, one row per batch. First column must be batch ID. If ``False`` (default), no initial conditions are used. phase_samples (dict or bool): Phase structure stored in the model for use in plotting functions. Default ``False``. mb_each_var (bool): If ``True``, treats each process variable as a separate block in a Multi-Block PLS model. If ``False`` (default), trajectories form a single block (plus an initial-conditions block if ``zinit`` is provided). cross_val (int): Cross-validation level (``0`` = none, ``100`` = LOO). Default ``0``. cross_val_X (bool): If ``True``, also cross-validates the X-space. Default ``False``. Returns: dict: Multi-way PLS model object extending the standard PLS dict from :func:`pyphi.calc.pls` (or :func:`pyphi.calc.mbpls`) with additional batch-specific keys: - ``'Yhat'`` (np.ndarray): In-sample Y predictions. - ``'varidX'`` (list[str]): Variable column names in unfolded order. - ``'bid'`` (list[str]): Block ID for each column. - ``'uf'`` (str): ``'batch wise'``. - ``'nvars'`` (int): Number of process variables per sample. - ``'nbatches'`` (int): Number of batches in the training set. - ``'nsamples'`` (int): Number of samples per batch. - ``'A'`` (int): Number of latent variables fitted. - ``'phase_samples'``: Phase structure passed in. - ``'mb_each_var'`` (bool): Whether MB-PLS was used. - ``'ninit'`` (int): Number of initial-condition variables (``0`` if ``zinit`` was not provided). """ if (xbatch.columns[1]=='PHASE') or \ (xbatch.columns[1]=='phase') or \ (xbatch.columns[1]=='Phase'): nvars = xbatch.shape[1]-2 else: nvars = xbatch.shape[1]-1 nbatches = len(np.unique(xbatch[xbatch.columns[0]])) nsamples = xbatch.shape[0]/nbatches # remove low variance columns keeping record of the original order x_uf_,colnames,bid_o = unfold_horizontal(xbatch) # colnames is original set of columns x_uf,colsrem = phi.clean_low_variances(x_uf_,shush=True) # colsrem are columns removed mx_rem=x_uf_[colsrem].mean().tolist() mx_rem=np.array(mx_rem) aux = np.array([colnames,bid_o]) col_names_bid_pd = pd.DataFrame(aux.T,columns=['col name','bid']) col_names_bid_pd_ = col_names_bid_pd[col_names_bid_pd['col name'].isin(x_uf.columns[1:].tolist())] #bid = col_names_bid_pd_['bid'].values # bid is vector indicating to what variable each col belongs # # useful in figuring out the blocks aux=col_names_bid_pd_.drop_duplicates(subset='bid',keep='first') unique_bid=aux['bid'].values.tolist() if not(isinstance(zinit,bool)): zinit,rc=phi.clean_low_variances(zinit) zcols=zinit.columns[1:].tolist() XMB={'Initial Conditions':zinit} else: XMB=dict() if mb_each_var: for v in unique_bid: these_cols=[x_uf.columns[0]] these_cols.extend(col_names_bid_pd_['col name'][col_names_bid_pd_['bid']==v].values.tolist()) varblock=x_uf[these_cols] XMB[v]=varblock else: if not(isinstance(zinit,bool)): XMB['Trajectories']=x_uf else: XMB = x_uf if not(isinstance(XMB,dict)): mpls_obj=phi.pls(XMB,y,a,cross_val=cross_val,cross_val_X=cross_val_X,force_nipals=True) yhat=phi.pls_pred(XMB,mpls_obj) yhat=yhat['Yhat'] else: mpls_obj=phi.mbpls(XMB,y,a,cross_val_=cross_val,cross_val_X_=cross_val_X,force_nipals_=True) yhat=phi.pls_pred(XMB,mpls_obj) yhat=yhat['Yhat'] if len(colsrem)>0: if not(isinstance(zinit,bool)): ninit_vars=zinit.shape[1]-1 z_sx = mpls_obj['sx'][np.arange(ninit_vars)].reshape(-1) z_mx = mpls_obj['mx'][np.arange(ninit_vars)].reshape(-1) z_ws = mpls_obj['Ws'][np.arange(ninit_vars),:] z_w = mpls_obj['W'][np.arange(ninit_vars),:] z_p = mpls_obj['P'][np.arange(ninit_vars),:] z_r2pv = mpls_obj['r2xpv'][np.arange(ninit_vars),:] xc_ = np.arange(ninit_vars,x_uf.shape[1]+ninit_vars-1 ) xuf_sx = mpls_obj['sx'][xc_] xuf_mx = mpls_obj['mx'][xc_] xuf_ws = mpls_obj['Ws'][xc_,:] xuf_w = mpls_obj['W'][xc_,:] xuf_p = mpls_obj['P'][xc_,:] xuf_r2pv = mpls_obj['r2xpv'][xc_,:] else: xuf_sx = mpls_obj['sx'] xuf_mx = mpls_obj['mx'] xuf_ws = mpls_obj['Ws'] xuf_w = mpls_obj['W'] xuf_p = mpls_obj['P'] xuf_r2pv = mpls_obj['r2xpv'] #add removed columns with mean of zero and stdev =1 and loadings = 0 xtra_col = np.zeros((2+4*a,1 )) xtra_col[0] = 1 xtra_cols = np.tile(xtra_col,(1,len(colsrem))) xtra_cols[1,:] = mx_rem aux = np.vstack((xuf_sx,xuf_mx,xuf_ws.T,xuf_w.T,xuf_p.T,xuf_r2pv.T)) aux = np.hstack((aux,xtra_cols)) all_cols = x_uf.columns[1:].tolist() all_cols.extend(colsrem) aux_pd = pd.DataFrame(aux,columns=all_cols) aux_pd = aux_pd[colnames] aux_new = aux_pd.values if not(isinstance(zinit,bool)): sx_ = np.hstack((z_sx,aux_new[0,:].reshape(-1))) mpls_obj['sx'] = sx_ mx_ = np.hstack((z_mx,aux_new[1,:].reshape(-1))) mpls_obj['mx'] = mx_ aux_new = aux_new[2:,:] ws_ = aux_new[0:a,:] mpls_obj['Ws'] = np.vstack((z_ws,ws_.T)) aux_new = aux_new[a:,:] w_ = aux_new[0:a,:] mpls_obj['W'] = np.vstack((z_w,w_.T)) aux_new = aux_new[a:,:] p_ = aux_new[0:a,:] mpls_obj['P'] = np.vstack((z_p,p_.T)) aux_new = aux_new[a:,:] r2xpv_ = aux_new[0:a,:] mpls_obj['r2xpv'] = np.vstack((z_r2pv,r2xpv_.T)) zcols.extend(colnames) colnames=zcols else: sx_ = aux_new[0,:].reshape(1,-1) mpls_obj['sx'] = sx_ mx_ = aux_new[1,:].reshape(1,-1) mpls_obj['mx'] = mx_ aux_new = aux_new[2:,:] ws_ = aux_new[0:a,:] mpls_obj['Ws'] = ws_.T aux_new = aux_new[a:,:] w_ = aux_new[0:a,:] mpls_obj['W'] = w_.T aux_new = aux_new[a:,:] p_ = aux_new[0:a,:] mpls_obj['P'] = p_.T aux_new = aux_new[a:,:] r2xpv_ = aux_new[0:a,:] mpls_obj['r2xpv'] = r2xpv_.T mpls_obj['Yhat'] = yhat mpls_obj['varidX'] = colnames mpls_obj['bid'] = bid_o mpls_obj['uf'] ='batch wise' mpls_obj['nvars'] = int(nvars) mpls_obj['nbatches'] = int(nbatches) mpls_obj['nsamples'] = int(nsamples) mpls_obj['A'] = a mpls_obj['phase_samples'] = phase_samples mpls_obj['mb_each_var'] = mb_each_var if not(isinstance(zinit,bool)): mpls_obj['ninit']=int(zinit.shape[1]-1) else: mpls_obj['ninit']=0 return mpls_obj
[docs] def find(a, func): """Return indices of elements in a list that satisfy a predicate function. Args: a (list): Input list to search. func (callable): A function that takes a single element and returns ``True`` if the element should be included. Example: ``lambda x: x == 0`` finds all zero-valued elements. Returns: list[int]: Indices of elements in ``a`` for which ``func`` returns ``True``. """ return [i for (i, val) in enumerate(a) if func(val)]
[docs] def clean_empty_rows(X,*,shush=False): """Remove rows that are entirely NaN from a batch DataFrame. Args: X (pd.DataFrame): Batch data. First column is batch ID; second column may optionally be a phase label (``'Phase'``, ``'phase'``, or ``'PHASE'``); remaining columns are process variables. shush (bool): If ``True``, suppresses printed output listing removed rows. Default ``False``. Returns: pd.DataFrame: Batch data with fully empty rows removed. Returns the original DataFrame unchanged if no empty rows are found. """ if (X.columns[1]=='PHASE') or \ (X.columns[1]=='phase') or \ (X.columns[1]=='Phase'): X_ = np.array(X.values[:,2:]).astype(float) ObsID_ = X.values[:,0].astype(str) ObsID_ = ObsID_.tolist() else: X_ = np.array(X.values[:,1:]).astype(float) ObsID_ = X.values[:,0].astype(str) ObsID_ = ObsID_.tolist() #find rows with all data missing X_nan_map = np.isnan(X_) Xmiss = X_nan_map*1 Xmiss = np.sum(Xmiss,axis=1) indx = find(Xmiss, lambda x: x==X_.shape[1]) if len(indx)>0: for i in indx: if not(shush): print('Removing row from ', ObsID_[i], ' due to 100% missing data') X_=X.drop(X.index.values[indx].tolist()) return X_ else: return X
[docs] def phase_sampling_dist(bdata,time_column=False,addtitle=False,use_phases=False): """Plot and return the distribution of samples (or time) consumed per phase. Produces a histogram panel — one subplot per phase plus one for the total — showing how many samples (or how much time) each batch spends in each phase. Useful for diagnosing alignment issues and batch variability before fitting a model. Args: bdata (pd.DataFrame): Batch data. First column is batch ID; second column must be a phase label (``'Phase'``, ``'phase'``, or ``'PHASE'``); remaining columns are process variables. Batches are stacked vertically. time_column (str or bool): If a column name is given, the x-axis represents elapsed time in that column rather than sample count. Default ``False``. addtitle (str or bool): Optional string used as the overall figure super-title. Default ``False``. use_phases (list[str] or bool): Subset of phases to include. If ``False`` (default), all phases present in the data are used. Returns: dict: Nested dictionary ``{phase: {batch_id: value}}`` where ``value`` is the sample count or elapsed time for that batch in that phase. """ data={} if bdata.columns[1]=='phase' or bdata.columns[1]=='Phase' or bdata.columns[1]=='PHASE': bids = unique(bdata,bdata.columns[0]) if isinstance(use_phases,bool): phases = unique(bdata,bdata.columns[1]) else: phases = use_phases #samps_per_phase=[] fig,ax=plt.subplots(1,len(phases)+1) for i,p in enumerate(phases): totsamps=[] samps_=[] samps_ind={} for b in bids: bdat= bdata[ (bdata[bdata.columns[1]]==p) & (bdata[bdata.columns[0]]==b)] if len(bdat)==0: samps_.append(0) samps_ind[b]=0 elif not(isinstance(time_column,bool)): samps_.append(bdat[time_column].values[-1]-bdat[time_column].values[0] ) totsamps.append(bdata[time_column][(bdata[bdata.columns[0]]==b)].values[-1]) samps_ind[b]=bdat[time_column].values[-1]-bdat[time_column].values[0] else: samps_.append(len(bdat)) totsamps.append(len(bdata[(bdata[bdata.columns[0]]==b)])) samps_ind[b]=len(bdat) ax[i].hist(samps_) data[p]=samps_ind if not(isinstance(time_column,bool)): ax[i].set_xlabel(time_column) else: ax[i].set_xlabel('# Samples') ax[i].set_ylabel('Count') ax[i].set_title(p) ax[-1].hist(totsamps) if not(isinstance(time_column,bool)): ax[-1].set_xlabel(time_column) else: ax[-1].set_xlabel('# Samples') ax[-1].set_ylabel('Count') ax[-1].set_title('Total') if not(isinstance(addtitle,bool)): fig.suptitle(addtitle) fig.tight_layout() return data else: print('Data is missing phase information or phase column is nor properly labeled')
[docs] def predict(xbatch,mmvm_obj,*,zinit=False): """Generate predictions for all batches in a dataset using a fitted MPCA or MPLS model. Unfolds ``xbatch`` batch-wise, projects through the model, and returns reconstructed X (and predicted Y for PLS models) refolded back to the original batch DataFrame structure. Args: xbatch (pd.DataFrame): Aligned batch data, same variable structure and number of samples per batch as the training data. First column is batch ID; second column may optionally be a phase label. mmvm_obj (dict): Multi-way PCA or PLS model from :func:`mpca` or :func:`mpls`. zinit (pd.DataFrame or bool): Initial-condition data, one row per batch. First column must be batch ID. Required if the model was fitted with initial conditions. Default ``False``. Returns: dict: Prediction results with keys: - ``'Tnew'`` (ndarray): Batch scores (n_batches × A). - ``'Xhat'`` (pd.DataFrame): Reconstructed X in original batch-stacked format (same structure as ``xbatch``). - ``'speX'`` (ndarray): X-space SPE per batch. - ``'T2'`` (ndarray): Hotelling's T² per batch. - ``'Yhat'`` (pd.DataFrame): Predicted Y (PLS models only), one row per batch with batch IDs as the first column. - ``'Zhat'`` (pd.DataFrame): Reconstructed initial conditions (PLS models only, when ``zinit`` is provided). """ if 'Q' in mmvm_obj: x_uf,colnames,bid_o = unfold_horizontal(xbatch) # colnames is original set of columns aux = np.array([colnames,bid_o]) col_names_bid_pd = pd.DataFrame(aux.T,columns=['col name','bid']) #bid = col_names_bid_pd['bid'].values # bid is vector indicating to what variable each col belongs # # useful in figuring out the blocks aux=col_names_bid_pd.drop_duplicates(subset='bid',keep='first') unique_bid=aux['bid'].values.tolist() if not(isinstance(zinit,bool)): XMB={'Initial Conditions':zinit} else: XMB=dict() if mmvm_obj['mb_each_var']: for v in unique_bid: these_cols=[x_uf.columns[0]] these_cols.extend(col_names_bid_pd['col name'][col_names_bid_pd['bid']==v].values.tolist()) varblock=x_uf[these_cols] XMB[v]=varblock else: if not(isinstance(zinit,bool)): XMB['Trajectories']=x_uf else: XMB = x_uf pred=phi.pls_pred(XMB,mmvm_obj) if not(isinstance(zinit,bool)): Zhat=pred['Xhat'][:,:mmvm_obj['ninit']] Xhat=pred['Xhat'][:,mmvm_obj['ninit']:] Xb=refold_horizontal(Xhat,mmvm_obj['nvars'],mmvm_obj['nsamples'] ) if (xbatch.columns[1]=='PHASE') or \ (xbatch.columns[1]=='phase') or \ (xbatch.columns[1]=='Phase'): Xb=pd.DataFrame(Xb,columns=xbatch.columns[2:]) Xb.insert(0,xbatch.columns[1],xbatch[xbatch.columns[1]].values) Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values) else: Xb=pd.DataFrame(Xb,columns=xbatch.columns[1:]) Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values) pred['Xhat']=Xb Zhat_df=pd.DataFrame(Zhat,columns=zinit.columns[1:].tolist()) Zhat_df.insert(0,zinit.columns[0],zinit[zinit.columns[0]].values.astype(str).tolist()) pred['Zhat']=Zhat_df test=xbatch.drop_duplicates(subset=xbatch.columns[0],keep='first') Y_df=pd.DataFrame(pred['Yhat'],columns=mmvm_obj['varidY']) Y_df.insert(0,test.columns[0],test[test.columns[0]].values.astype(str).tolist()) pred['Yhat']=Y_df else: Xb=refold_horizontal(pred['Xhat'],mmvm_obj['nvars'],mmvm_obj['nsamples'] ) if (xbatch.columns[1]=='PHASE') or \ (xbatch.columns[1]=='phase') or \ (xbatch.columns[1]=='Phase'): Xb=pd.DataFrame(Xb,columns=xbatch.columns[2:]) Xb.insert(0,xbatch.columns[1],xbatch[xbatch.columns[1]].values) Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values) else: Xb=pd.DataFrame(Xb,columns=xbatch.columns[1:]) Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values) pred['Xhat']=Xb test=xbatch.drop_duplicates(subset=xbatch.columns[0],keep='first') Y_df=pd.DataFrame(pred['Yhat'],columns=mmvm_obj['varidY']) Y_df.insert(0,test.columns[0],test[test.columns[0]].values.astype(str).tolist()) pred['Yhat']=Y_df else: if mmvm_obj['uf'] =='batch wise': x_uf,colnames,bid_o = unfold_horizontal(xbatch) pred=phi.pca_pred(x_uf,mmvm_obj) Xb=refold_horizontal(pred['Xhat'],mmvm_obj['nvars'],mmvm_obj['nsamples'] ) if (xbatch.columns[1]=='PHASE') or \ (xbatch.columns[1]=='phase') or \ (xbatch.columns[1]=='Phase'): Xb=pd.DataFrame(Xb,columns=xbatch.columns[2:]) Xb.insert(0,xbatch.columns[1],xbatch[xbatch.columns[1]].values) Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values) else: Xb=pd.DataFrame(Xb,columns=xbatch.columns[1:]) Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values) pred['Xhat']=Xb if mmvm_obj['uf'] =='variable wise': if (xbatch.columns[1]=='PHASE') or \ (xbatch.columns[1]=='phase') or \ (xbatch.columns[1]=='Phase'): xbatch_=xbatch.copy() xbatch_.drop(xbatch.columns[1],axis=1,inplace=True) else: xbatch_=xbatch.copy() pred=phi.pca_pred(xbatch_,mmvm_obj) Xb=pred['Xhat'] if (xbatch.columns[1]=='PHASE') or \ (xbatch.columns[1]=='phase') or \ (xbatch.columns[1]=='Phase'): Xb=pd.DataFrame(Xb,columns=xbatch.columns[2:]) Xb.insert(0,xbatch.columns[1],xbatch[xbatch.columns[1]].values) Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values) else: Xb=pd.DataFrame(Xb,columns=xbatch.columns[1:]) Xb.insert(0,xbatch.columns[0],xbatch[xbatch.columns[0]].values) pred['Xhat']=Xb return pred
def _plot_contribs(bdata,ylims,*,var_list=False,phase_samples=False,alpha_=0.2,plot_title_=''): ''' Internal routine please use : contributions ''' if (bdata.columns[1]=='PHASE') or \ (bdata.columns[1]=='phase') or \ (bdata.columns[1]=='Phase'): if isinstance(var_list,bool): var_list=bdata.columns[2:].tolist() else: if isinstance(var_list,bool): var_list=bdata.columns[1:].tolist() if isinstance(var_list,str) and not(isinstance(var_list,list)): var_list=[var_list] for v in var_list: plt.figure() dat=bdata[[bdata.columns[0],v]] for b in np.unique(dat[bdata.columns[0]]): data_=dat[v][dat[dat.columns[0]]==b] plt.fill_between(np.arange(len(data_.values))+1,data_.values) plt.xlabel('Sample') plt.ylabel(v) if not(isinstance(phase_samples,bool)): s_txt=1 s_lin=1 plt.axvline(x=1,color='magenta',alpha=alpha_) for p in phase_samples.keys(): if isinstance(phase_samples[p],list): s_lin+=phase_samples[p][1] else: s_lin+=phase_samples[p] plt.axvline(x=s_lin,color='magenta',alpha=alpha_) ylim_=plt.ylim() plt.annotate(p, (s_txt,ylim_[0]),rotation=90,alpha=0.5,color='magenta') if isinstance(phase_samples[p],list): s_txt+=phase_samples[p][1] else: s_txt+=phase_samples[p] plt.ylim(ylims) plt.title(plot_title_) plt.tight_layout()
[docs] def contributions (mmvmobj,X,cont_type,*,to_obs=False,from_obs=False, lv_space=False,phase_samples=False,dyn_conts=False,which_var=False, plot_title=''): """Plot variable contributions to scores, HT², or SPE for a batch model. Computes and visualises how much each process variable at each time point contributes to the specified monitoring statistic for the given batch(es). Both a summary bar chart (absolute contributions summed over time) and an optional dynamic time-series plot are produced. Args: mmvmobj (dict): Multi-way PCA or PLS model from :func:`mpca` or :func:`mpls`. X (pd.DataFrame): Batch data, same structure as the training data (aligned, batch-wise stacked). cont_type (str): Type of contribution to compute. Options: ``'scores'``, ``'ht2'``, ``'spe'``. to_obs (list[str] or bool): Batch ID(s) to diagnose. This argument is required. Default ``False``. from_obs (list[str] or bool): Reference batch ID(s) for difference-based contributions (``'scores'`` and ``'ht2'`` only). If ``False`` (default), the model origin is used as the reference. Ignored for ``'spe'``. lv_space (int, list[int], or bool): Component index/indices to compute contributions for (``'scores'`` only). If ``False`` (default), contributions are summed across all components. phase_samples (dict or bool): Phase structure for annotating phase boundaries in dynamic contribution plots. Default ``False``. dyn_conts (bool): If ``True``, also produces dynamic time-series contribution plots (one per variable) in addition to the summary bar chart. Default ``False``. which_var (str, list[str], or bool): Variables to include in the dynamic contribution plots (only used when ``dyn_conts=True``). If ``False`` (default), all variables are shown. plot_title (str): Title applied to all figures. Default ``''``. Returns: None: Displays Matplotlib figures (bar chart always; time-series plots if ``dyn_conts=True``). """ #translate from batch numbers to indexes #Check logic and warn user if (X.columns[1]=='PHASE') or \ (X.columns[1]=='phase') or \ (X.columns[1]=='Phase'): bvars = X.columns[2:] else: bvars = X.columns[1:] allok=True if isinstance(to_obs,bool): print('Contribution calculations need a "to_obs" argument at minimum') allok=False if cont_type=='spe' and not(isinstance(from_obs,bool)): print('For SPE contributions the "from_obs" argument is ignored') if (isinstance(to_obs,list) and len(to_obs)>1) or (isinstance(from_obs,list) and len(from_obs)>1): print('Contributions for groups of observations will be averaged') if (cont_type=='scores') and isinstance(lv_space,bool): print('No dimensions specified, doing contributions across all dimensions') if allok: #find indexes to batch names Xuf,var1,var2=unfold_horizontal(X) bnames=Xuf[Xuf.columns[0]].values.tolist() to_o=[bnames.index(to_obs[i]) for i in np.arange(len(to_obs))] if (cont_type=='scores') or (cont_type=='ht2'): if not(isinstance(from_obs,bool)): from_o = [bnames.index(from_obs[i]) for i in np.arange(len(from_obs))] else: from_o =False cont_vec=phi.contributions(mmvmobj,Xuf,cont_type,Y=False,from_obs=from_o,to_obs=to_o,lv_space=lv_space) elif cont_type=='spe': cont_vec=phi.contributions(mmvmobj,Xuf,cont_type,Y=False,from_obs=False,to_obs=to_o) ymin=cont_vec.min() ymax=cont_vec.max() #Plot contributions if dyn_conts: batch_contribs=refold_horizontal(cont_vec, mmvmobj['nvars'],mmvmobj['nsamples']) batch_contribs = pd.DataFrame(batch_contribs,columns=bvars) oids=['Batch Contributions']*batch_contribs.shape[0] batch_contribs.insert(0,'BatchID',oids) _plot_contribs(batch_contribs,(ymin, ymax),phase_samples=phase_samples,var_list=which_var,plot_title_=plot_title) batch_contribs=refold_horizontal(abs(cont_vec), mmvmobj['nvars'],mmvmobj['nsamples']) batch_contribs=np.sum(batch_contribs,axis=0) plt.figure() plt.bar(bvars, batch_contribs) plt.xticks(rotation=75) plt.ylabel(r'$\Sigma$ (|Contribution to '+cont_type+'| )' ) plt.title(plot_title) plt.tight_layout()
[docs] def build_rel_time(bdata,*,time_unit='min'): """Convert a ``'Timestamp'`` column to relative elapsed time from batch start. For each batch, computes elapsed time since the first timestamp and adds it as a new ``'Time (<unit>)'`` column, replacing the original ``'Timestamp'`` column. Args: bdata (pd.DataFrame): Batch data containing a ``'Timestamp'`` column with datetime-compatible values. First column is batch ID. time_unit (str): Unit for the output time column. ``'min'`` (default) produces minutes; ``'hr'`` produces hours; ``'s'`` keeps seconds. Returns: pd.DataFrame: Batch data with ``'Timestamp'`` replaced by ``'Time (<time_unit>)'``, where values are elapsed time from the start of each batch. """ aux=bdata.drop_duplicates(subset=bdata.columns[0],keep='first') unique_batches=aux[aux.columns[0]].values.tolist() bdata_n=[] for b in unique_batches: this_batch=bdata[bdata[bdata.columns[0]]==b] ts=this_batch['Timestamp'].values ts=pd.to_datetime(ts) deltat=ts-ts[0] tim=[np.timedelta64(deltat[i],'s').astype(float) for i in np.arange(len(deltat))] tim=np.array(tim) if time_unit == 'min': tim=tim/60 if time_unit == 'hr': tim=tim/3600 cols=this_batch.columns.tolist() this_batch.insert(cols.index('Timestamp'),'Time ('+time_unit+')' ,tim) this_batch=this_batch.drop('Timestamp',axis=1) bdata_n.append(this_batch) bdata_n = pd.concat(bdata_n,axis=0) return bdata_n
[docs] def descriptors(bdata,which_var,desc,*,phase=False): """Compute summary descriptors for batch trajectories, optionally per phase. Calculates one or more statistical descriptors for each variable in each batch, returning a single row per batch suitable for use as input to a PLS or PCA model. Args: bdata (pd.DataFrame): Batch data. First column is batch ID; second column may optionally be a phase label (``'Phase'``, ``'phase'``, or ``'PHASE'``); remaining columns are process variables. which_var (list[str]): Variable names to compute descriptors for. desc (list[str]): Descriptor types to calculate. Supported values: - ``'min'``: Minimum value. - ``'max'``: Maximum value. - ``'mean'``: Arithmetic mean. - ``'median'``: Median value. - ``'std'``: Standard deviation (ddof=1). - ``'var'``: Variance (ddof=1). - ``'range'``: Max minus min. - ``'ave_slope'``: Average linear slope (estimated via least squares). phase (list[str] or bool): If a list of phase names is provided, descriptors are computed separately within each phase, and column names are suffixed with ``'_<phase>_<descriptor>'``. If ``False`` (default), descriptors are computed over the full batch trajectory. Returns: pd.DataFrame: One row per batch, first column is batch ID, remaining columns are descriptor values named ``'<variable>_<phase>_<descriptor>'`` (with phase) or ``'<variable>_<descriptor>'`` (without phase). """ has_phase=False if ((bdata.columns[1]=='Phase') or (bdata.columns[1]=='phase') or (bdata.columns[1]=='PHASE')): has_phase=True phase_col=bdata.columns[1] if not(has_phase) and not(isinstance(phase,bool)): print('Cannot process phase flag without phase id in data') phase=False desc_val=[] desc_lbl=[] for i,b in enumerate(unique(bdata,bdata.columns[0])): this_batch=bdata[bdata[bdata.columns[0]]==b] desc_val_=[] for v in which_var: if not(isinstance(phase,bool)): # by phase for p in phase: values=this_batch[v][this_batch[phase_col]==p].values.astype(float) values=values[~(np.isnan(values))] for d in desc: if d=='min': desc_val_.append(values.min()) if d=='max': desc_val_.append(values.max()) if d=='mean': desc_val_.append(values.mean()) if d=='median': desc_val_.append(np.median(values)) if d=='std': desc_val_.append(np.std(values,ddof=1)) if d=='var': desc_val_.append(np.var(values,ddof=1)) if d=='range': desc_val_.append(values.max()-values.min() ) if d=='ave_slope': x=np.arange(0,len(values)) m=(1/sum(x**2))*sum(x*(values-values[0])) desc_val_.append(m) if i==0: desc_lbl.append(v +'_'+ p +'_'+ d) else: values=this_batch[v].values.astype(float) values=values[~(np.isnan(values))] for d in desc: if d=='min': desc_val_.append(values.min()) if d=='max': desc_val_.append(values.max()) if d=='mean': desc_val_.append(values.mean()) if d=='median': desc_val_.append(np.median(values)) if d=='std': desc_val_.append(np.std(values,ddof=1)) if d=='var': desc_val_.append(np.var(values,ddof=1)) if d=='range': desc_val_.append(values.max()-values.min() ) if d=='ave_slope': x=np.arange(0,len(values)) m=(1/sum(x**2))*sum(x*(values-values[0])) desc_val_.append(m) if i==0: desc_lbl.append(v +'_' + d) desc_val.append(desc_val_) bnames = unique(bdata,bdata.columns[0]) desc_val=np.array(desc_val) descriptors_df=pd.DataFrame(desc_val,columns=desc_lbl ) descriptors_df.insert(0,bdata.columns[0],bnames ) return descriptors_df