'''
Koopman Analysis
================
Routines and methods for Koopman analysis
This module contains the tools for performing a Koopman or Perron decomposition on
time series of vector data. It is based on Klus routines in module `Klus`.
Classes
-------
| **Koop** Transfer Operator
| **KoopGen** Generator Transfer Operator
Examples
--------
>>> options = {'operator': 'Koopman',
>>> 'kernel_choice': 'poly', 'k_shift':0.1,
>>> 'poly_shift':0,'poly_order':1,
>>> 'bandwidth': 'std',
>>> 'epsilon':1e-5,'time_data': X.A.time.data[:-1]}
>>> KK=zkop.Koop(vdat,**options)
>>> KK.fit(bandwidth='std')
'''
from distutils import file_util
import math
from tkinter.tix import Select
#
import numpy as np
import xarray as xr
from sklearn.neighbors import KernelDensity
import klus.algorithms as al
import klus.kernels as kernels
import klus.tools as tools
import scipy.linalg as sc
import numpy.linalg as lin
from scipy.spatial import distance
#
import tabulate as tab
[docs]
def choose_eig(option,ww,tol,dt):
'''
Choose stable eigenvalues, either according their absolute value
or according the sign of the real part
Parameters
----------
ww: complex
Koopman Eigenvalues
option:
* `one` Eigenvalues close to one in Abs less than `tol`
* `stable` Real part < 0
* `unstable` Real part > 0
tol:
Tolerance
dt:
Time interval used for the Koopman eigenvalues calculation
Returns
-------
ind:
Index of selected eigenvalues
'''
print(f'Choose eigenvalues according to option {option}')
if option == 'one':
ind = np.where(np.abs(1-np.abs(ww)) < tol)[0]
elif option == 'stable':
wlog = np.log(ww)/dt
ind= np.where(wlog.real < tol )[0]
elif option == 'unstable':
wlog = np.log(ww)/dt
ind = np.where(wlog.real > tol )[0]
else:
raise SystemExit(f'Wrong option {option} in Choose_eig')
return ind
[docs]
def reconstruct_koopman(w,phi0,v,dt,n,decode=None):
'''
Linear decomposition with Koopman modes
Parameters
----------
w:
Koopman Generator Eigenvalues
phi0:
Initial values for the eigenfunctions
v:
Koopman modes
dt:
Timesteps
n:
Number of time steps
decode:
If not empty contains the array for decoding
to physical space
Returns
-------
x:
Predicted values
'''
#Choose only eigenfunctions close to 1 and dimensions
time_range = np.arange(n)
Ntime = n
try:
Nmode = len(w)
except:
Nmode = 1
#Time Matrix
D = np.zeros([Nmode,Ntime],dtype='complex')
for i in range(Ntime):
D[:,i] = np.exp(w*i*dt)
try:
Dphi = np.diag(phi0)
except:
Dphi = phi0
x = v.T@Dphi@D
if decode is None:
y = x
else:
y = decode@x
return y.real
[docs]
def check_sim(ax,X,tit,option='std'):
'''
Compute scale for kernel
Parameters
----------
ax :
Axes for histogram plot
X :
Data Matrix
tit :
Title for histogram
option:
Type of scale
* 'std' Standard deviation
* 'median' Median
Return
------
scale : float
Scale required for the bandwidth
'''
similarity=distance.squareform(distance.pdist(X.T ,'sqeuclidean'))
if option == 'std':
scale=np.std(similarity.flatten())
elif option == 'median':
scale = np.median(similarity.flatten())
else:
raise SystemExit(f'Wrong option {option} in check_sim')
sigma = np.sqrt(scale/2)
print(f'Bandwidth for normalized distances: sigma {sigma}, Option: {option}, Value = {scale}')
check = np.std(distance.squareform(distance.pdist(X.T ,'sqeuclidean'))/(2*sigma**2))
print(f'Check std for normalized distances {check}')
ax.hist(similarity.flatten(),200,density=True)
ax.set_xlabel('Distance')
ax.set_ylabel('Density')
ax.set_title(tit)
delta = sigma
return delta
[docs]
def order_w(w,option='magnitude',direction='up',tdelta=1):
'''
Order Eigenvalues according to `option`
Parameters
----------
option : String
* 'magnitude' abs(w)
* 'frequency' w.imag
* 'growth' log(w).real/dt
* 'ones' abs(w) closest to 1.0
* 'stable' log(w).real/dt irrespextive of sign
direction : String
* 'up' descending
* 'down' ascending
delta
Sampling frequency
Returns
-------
w0 : Array
Ordered Koopman Eigenvalues
w1 : Array
Ordered generator eigenvalues
ind : Array
Index of order for ordering Koopman modes
'''
print(' Ordering Eigenvalues as ', option, ' with direction ',direction)
w_cont = np.log(w)/tdelta
if option == 'magnitude':
ind=abs(w).argsort()
elif option == 'frequency':
ind=abs(w_cont.imag).argsort()
elif option == 'growth':
ind=w_cont.real.argsort()
elif option == 'one':
ind=np.abs(np.abs(w) - 1.0).argsort()
elif option == 'stable':
ind=abs(w_cont.real).argsort()
else:
print(' Error in order_w', option, direction)
# Choose direction
if direction == 'up':
indu=ind[::-1]
else:
indu=ind
w0=w[indu]
w1=w_cont[indu]
return w0,w1,indu
[docs]
def expand_modes(v_o,KM_o,VM_o, vw_o,v_s,KM_s,VM_s,vw_s , \
modetype='', description='', \
X=None,file=None):
'''
Expand modes into the original data shape,
optionally write to disk. complex numbers are not supported by netcdf4,
so we use h5netcdf with
`ds.to_netcdf(file,engine="h5netcdf", invalid_netcdf=True)`
Parameters
----------
v_o,KM_o,VM_o, vw_o: Arrays
Original norms, Koopman Modes, Eigenfunction values, Eigenvalues
v_s,KM_s,VM_s,vw_s: Arrays
Ordered, norms, Koopman Modes, Eigenfunction values, Eigenvalues
X: Xmat class
Original data array with geographical information.
file: str
Name of file to be written. Optional, if `None` no file is written
Returns
-------
ds : Xarray Dataset
Dataset with Koopman decomposition
Examples
--------
>>> dskm = zkop.expand_modes(v_o,KM_o,VM_o, vw_o,v_s,KM_s,VM_s,vw_s , X=X,file='filename')
>>> # Read with
>>> dds=xr.open_dataset('filename',engine='h5netcdf')
'''
X_KM_o = xr.full_like(X.A,0,dtype='float').isel(time=slice(0,KM_o.shape[1])).rename({'time':'modes'})
X_KM_o.data = KM_o
X_KM_s = xr.full_like(X.A,0,dtype='float').isel(time=slice(0,KM_s.shape[1])).rename({'time':'modes'})
X_KM_s.data = KM_s
# Create Data Sets for Koopman modes
ds = xr.Dataset(
data_vars=dict(
X_KM_o=(["x", "modes"], X_KM_o.data),
X_KM_s=(["x", "modes"], X_KM_s.data),
eigfun_o=(["time", "modes"], VM_o.data),
eigfun_s=(["time", "modes"], VM_s.data),
eigval_s=(["modes"], vw_s.data ),
eigval_o=(["modes"], vw_o.data ),
vnorm_o= (["modes"], v_o.data ),
vnorm_s= (["modes"], v_s.data ), ),
coords=dict(
feature=(["x"], np.arange(KM_o.shape[0])),
time=(["time"], np.arange(VM_o.shape[0])),
modes=(["modes"], np.arange(KM_o.shape[1]) ) ),
attrs=dict(type=modetype, description=description) )
if file:
ds.to_netcdf(file,engine="h5netcdf", invalid_netcdf=True)
return ds
[docs]
def find_modes(tper,wto, vrange, **kwargs):
'''
Find modes close to a specific Period `tper`
within a specified `vrange` in month
Parameters
----------
tper:
Target Period (in years)
wto:
Koopman Generator Eigenvalues
Also accept all other arguments for `isclose`
vrange:
Range (months)
Returns
-------
ind:
Index into closest period to `tper`
'''
pi2 = 2*math.pi
vtol = vrange/12
print(f'Seaching for modes close to {tper} Yr periods, with range {vrange} months')
near_per = np.where(np.isclose(pi2/abs(wto.imag)/12, tper,atol=vtol, **kwargs))
for i in near_per:
print(f'Found modes {i}/{len(near_per[0])}, \tPeriod {pi2/wto[i].imag/12}\n')
return near_per[0]
[docs]
def density_estimate(data,interval=[-1,-1,400],kernel='gaussian',bw=0.05,**kwargs):
'''
Compute density estimation
Using kde from sklearn
Parameters
----------
data: 1D array
Array 1D of data to be analyzed
interval: list
[min,max, number_of_points]
kernel:
kernel for estimation (default gaussian)
bw:
Bandwidth for the kernel
Other arguments to `kde` can be passed on
Returns
-------
X : Array
Points where the distribution is computed
points : Array
Value of the distribution
kde : Object
The Density object
Examples
--------
>>> density_estimate(data,interval=[-1,-1,400],kernel='gaussian',bw=0.05)
'''
Xplot = np.linspace(interval[0],interval[1],interval[2])[:, np.newaxis]
vmode=np.zeros([len(data),1])
vmode[:,0] = data
kde = KernelDensity(kernel='gaussian', bandwidth=bw).fit(vmode)
log_dens = kde.score_samples(Xplot)
points= np.exp(log_dens)
return Xplot[:,0], points, kde
[docs]
def one_over(y):
"""
Vectorized 1/x, treating x==0 manually
"""
x = np.array(y).astype(float)
near_zero = np.isclose(y, 0)
x[near_zero] = np.inf
x[~near_zero] = 1. / x[~near_zero]
return x
[docs]
def one_over_years(y):
"""Vectorized 1/x, treating x==0 manually
Period calculation from imaginary part
of generator eigenvalues 2 pi/log(w.imag)
"""
x = np.array(y).astype(float)
near_zero = np.isclose(y, 0)
x[near_zero] = np.inf
#Transform in years
x[~near_zero] = 2*math.pi / x[~near_zero]/12
return x
#
[docs]
class Koop():
""" This class creates the Koopman operator
The input array is supposed to be in features/samples form
The augemnted matrix is created inside
Parameters
----------
vdat: array
Data array with features and samples
**kwargs : dict
* 'operator' Choice of operator `Koopman` or `Perron`
* 'deltat' Time interval between samples
* 'kernel_choice' Kernel choice , `gauss`,`poly`
* 'sigma' Bandwidth for Gaussian kernel
* 'std' -- Standard Deviation of distances
* 'median -- Median of distances
* '100' -- Explicit value
* 'epsilon' Regularization parameter
* 'maxeig' Max number of eigenvalues to compute
* 'poly_order': 1 Order of polynomial kernel
* 'poly_shift':0 Shift of polynomial kernel
Attributes
----------
PsiX : numpy array
Reduced data matrix of type *array*
PsiY : numpy array
Shifted data matrix of type *array*
ntime : int
Number of samples or time points
npoints : int
Number of features or spatial points
deltat : float
Time interval between samples
operator: str
Operator chosen {'Koopman', 'Perron'}
kernel_choice: object
Kernel used in the estimator
epsilon : float
Tikhonov Regularization parameter {1e-5}
sigma : float
Bandwidth for Gaussian kernel
maxeig : int
Maximum number of eigenvalues to compute
poly_order: int
Order polynomial kernel
poly_shift: float
Shift polynomial kernel
ww: array
Koopman eigenvalues
wwg: array
Koopman generator eigenvalues
cc: array
Koopman eigenfunctions coefficients
vv: array
Koopman eigenfunction values on the samples
wf: array
Filtered Koopman eigenvalues
wfg: array
Filtered Koopman generator eigenvalues
vvf: array
Filtered Koopman eigenfunction values on the samples
Gxx: array
Kernel Matrix Gxx
Gxy: array
Kernel Matrix Gxy
ker: object
Object Kernel used
time_data:
Time array (optional)
Examples
--------
Create a Koopman operator
>>> K = Koop(X, {operator: 'koopman',
kernel_choice : 'gauss', sigma : 42, epsilon : 1e-5})
"""
__slots__ = ('PsiX','PsiY','ntime','npoints', 'operator', 'deltat','time_data',\
'kernel_choice','epsilon','sigma','maxeig','bandwidth',\
'poly_order','poly_shift', 'k_shift',\
'wf','wfg','vvf',\
'ww','wwg','vv','cc','Gxx','Gxy','ker')
def __init__(
self,
vdat,
**kwargs ):
#Set default arguments
defaultOptions = {'operator': 'Koopman', 'deltat':1,
'kernel_choice' : 'gauss', 'bandwidth' : 'std', 'epsilon' : 1e-5, 'maxeig':3000,
'poly_order': 1,'poly_shift':0,'k_shift':0.0, 'time_data': None}
options ={**defaultOptions, **kwargs}
self.PsiX = vdat[:,:-1]
'''Reduced Data matrix of type (`array`)'''
self.PsiY = vdat[:,1:]
'''Shifted data matrix of type (`array`)'''
self.ntime = self.PsiX.shape[1]
'''Number of samples or time points (`int`)'''
self.npoints = self.PsiX.shape[0]
'''Number of features or spatial points (`int`)'''
self.deltat = 1
'''Time interval between samples (`float`)'''
self.bandwidth = None
'''Option Bandwidth for Gaussian kernel (`str`)'''
self.sigma = None
'''Bandwidth for Gaussian kernel (`float`)'''
self.operator = options['operator']
'''Operator chosen (`str`) {'Koopman', 'Perron'}'''
self.kernel_choice = options['kernel_choice']
'''Kernel used in the estimator (`str`)'''
self.epsilon = options['epsilon']
'''Tikhonov Regularization parameter (`float`) {1e-5}'''
self.maxeig = options['maxeig']
'''Maximum number of eigenvalues to compute (`int`)'''
self.poly_order = options['poly_order']
'''Order polynomial kernel (`int`)'''
self.poly_shift = options['poly_shift']
'''Shift polynomial kernel (`float`)'''
self.k_shift = options['k_shift']
'''Shift kernel (`float`)'''
if options['time_data'] is not None:
self.time_data = options['time_data']
else:
self.time_data = np.arange(self.ntime)
'''Time array (`array`)'''
print(f'Created {self.operator} Estimator \n for {self.ntime} samples and {self.npoints} features, \
{self.deltat} time interval' )
self.wf = None
'''Filtered Koopman eigenvalues (`array`)'''
self.wfg = None
'''Filtered Koopman generator eigenvalues (`array`)'''
self.vvf = None
'''Filtered Koopman eigenfunction values on the samples (`array`)'''
self.ww = None
'''Koopman eigenvalues (`array`)'''
self.wwg = None
'''Koopman generator eigenvalues (`array`)'''
self.vv = None
'''Koopman eigenfunction values on the samples (`array`)'''
self.cc = None
'''Koopman eigenfunctions coefficients (`array`)'''
self.Gxx = None
'''Kernel Matrix Gxx (`array`)'''
self.Gxy = None
'''Kernel Matrix Gxy (`array`)'''
self.ker = None
'''Kernel used (`object`)'''
def __call__(self, v ):
''' Not implemented'''
print('Not implemented')
return
def __repr__(self):
''' Printing Information '''
print(f'Koopman Estimator for operator {self.operator} with paramenters:\n \
Kernel_choice {self.kernel_choice}\n \
Maxeig {self.maxeig}\n \
Epsilon {self.epsilon} \n ')
if self.kernel_choice == 'poly':
print(f' Poly_Order {self.poly_order} \n \
Poly_Shift {self.poly_shift}\n ')
elif self.kernel_choice == 'gauss':
print(f' Bandwidth {self.sigma}\n \
')
return '\n'
[docs]
def fit(self,bandwidth=None,verbose=False):
'''
Fitting estimator.
Select bandwidth with `bandwidth`
Parameters
----------
bandwidth: string
Select the bandiwidth for the Gaussian kernel
* 'std' Use bandwidth that make the distances of std 1
* 'median' Use median value of distances
* '100' Use this explicit numerical value
verbose: Boolean
True Print Messages
False No Messages (Defaults)
Returns
-------
ww:
Eigenvalues
self.wwg = np.log(ww_tot)/self.deltat:
Generator Eigenvalues
self.vv:
Value of Eigenfunctions at samples
self.Gxx:
kernel matrix Gxx
self.Gxy:
kernel matrix Gxy
self.ker:
Kernel used
'''
if self.operator == 'Perron':
ops='P'
elif self.operator =='Koopman':
ops='K'
else:
raise ValueError('TRANSFER_OP_GAUSS: --- Wrong Operator choice ')
print(' Calculating {} Operator '.format(self.operator))
self.bandwidth = bandwidth
if bandwidth is None or bandwidth == 'std':
similarity=distance.squareform(distance.pdist(self.PsiX.T ,'sqeuclidean'))
scale=np.std(similarity.flatten())
elif bandwidth == 'median':
similarity=distance.squareform(distance.pdist(self.PsiX.T ,'sqeuclidean'))
scale = np.median(similarity.flatten())
else:
scale = 2*(float(bandwidth))**2
self.sigma = np.sqrt(scale/2)
if self.kernel_choice == 'gauss':
if verbose:
print(f'Using option {bandwidth} for a {scale} and sigma {self.sigma}\n')
k = kernels.gaussianKernel(self.sigma)
elif self.kernel_choice == 'poly':
k = kernels.polynomialKernel(p=self.poly_order,c=self.poly_shift)
elif self.kernel_choice == 'gaussShifted':
k = kernels.gaussianKernelShifted(self.sigma, shift=self.k_shift)
else:
raise ValueError(f'KK_fit: --- WrongKernel choice {self.kernel_choice}')
ww_tot,vv_tot,cc_eig,Am,Gxx,Gxy =al.kedmd(self.PsiX,self.PsiY, k, \
epsilon=self.epsilon, evs=self.maxeig, operator=ops,kind='kernel')
if verbose:
print(f'Computed Transfer Eigenvalues')
print(f'Rank of Gxx Matrix {lin.matrix_rank(Gxx)} of dimension {Gxx.shape[0]}\n')
self.ww = ww_tot
self.wwg = np.log(ww_tot)/self.deltat
self.vv = vv_tot
self.cc = cc_eig
self.Gxx = Gxx
self.Gxy = Gxy
self.ker = k
return
[docs]
def order(self,choice='frequency',direction='down',cut_period=3):
'''
Order Eigenvalues according to `option`
Parameters
----------
option:
* 'magnitude' abs(w)
* 'frequency' w.imag
* 'growth' log(w).real/dt
* 'ones' abs(w) closest to 1.0
* 'stable' log(w).real/dt irrespextive of sign
direction:
* 'up' descending
* 'down' ascending
cut_period:
Eliminate modes with shorter periods (sample units)
Returns
-------
wf:
Ordered Koopman Eigenvalues
wfg:
Ordered generator eigenvalues
vvf:
Ordered Koopman eigenfunctions
'''
# cut period in (months)
if cut_period is not 0:
wmax = np.max(np.where(2*math.pi/self.wwg.imag > cut_period))
else:
wmax = len(self.wwg.imag)
_wf =self.ww[0:wmax]
_vvf = self.vv[:,0:wmax]
print(f'Eliminating frequency higher than {cut_period} months, \
original modes {len(self.ww)}, remaining {len(_wf)}')
wf,wfg, indf = order_w(_wf,option=choice,direction=direction,tdelta=self.deltat)
self.wf = wf
self.wfg = wfg
self.vvf = _vvf[:,indf]
return wf, wfg, self.vvf
[docs]
def eigone(self,ww,vv,tol=0.1,select='one',verbose=True):
'''
Choose stable eigenvalues, either according their absolute value
or according the sign of the real part
Parameters
----------
ww: complex
Koopman Eigenvalues
vv: array complex
Koopman eigenfunctions
select:
* `one` Eigenvalues close to one in Abs less than `tol`
* `stable` Real part < 0
* `unstable` Real part > 0
tol:
Tolerance
verbose:
Print Diagnostics
Returns
-------
ww:
Selected eigenvalues
vv:
Selected eigenfunctions
ind:
Index of selected eigenvalues
'''
print(f'Keeping only EI close to unity less than tol {tol}')
indone = choose_eig(select,ww,tol,self.deltat)
N1 = len(indone)
wone = ww[indone]
print(f'\n Trace of generator eigenvalues close to unity according to tolerance {tol} ---> {sum(np.log(wone))}')
print(f' Factor of Phase Volume contraction {np.exp(sum(np.log(wone))):.6f} per month\n')
# Order retained unitary eigenvalues and obtain generator eigenvalues 'expwr'
print(f'Number of Eigenvalues found {N1} over {len(ww)} for kmode tol {tol}\n\n')
# _,expwr,indexpwr = order_w(wone,option='stable',direction='down',tdelta=1)
headr = ('Mode','Real Eig','Im Eig','Abs','Gen Real (Yr)','Gen Imag (Yr)')
kaz_data = [(i,wone[i].real, wone[i].imag, abs(wone[i]), 1./np.log(wone[i]).real/12, 2*math.pi/np.log(wone[i]).imag/12) for i in range(min(len(wone),50))]
if verbose:
print(tab.tabulate(kaz_data, headr,stralign='center',tablefmt='github'))
_vv = vv[:,indone]
return wone,_vv,indone
[docs]
def compute_modes(self,ww,vv,modetype='',description='',normalization=False):
'''
Compute Koopman Modes
Parameters
----------
ww:
Koopman eigenvalues
vv:
Value of Koopman EIgenfunctions at samples
modetype:
Descriptive label
description:
Data Descritpion
normalization:
If `False` (Default) the eigenfunctions are not normalized, se to`True`
to get Koopman modes over normalized eigenfunctions.
Returns
-------
ds : data set
.. table:: Content for `ds`
:widths: auto
====================== ============================================
Variable Description
====================== ============================================
keig(time,modes) Normalized Koopman eignfunction values at sample points
kmodes(features,modes) Koopman modes
periods Periods of Koopman modes
eigenvalues Koopman Eigenvalues
====================== ============================================
'''
Nmodes = vv.shape[1]
Ntimes = self.ntime
Nfeatures = self.npoints
# Normalize Koopman eigenfunctions
if normalization:
vvnorm = sc.norm(vv,axis=0)
vnormalized = vv@np.diag(1./vvnorm)
else:
vnormalized = vv
# The ordering is consistent with Mezic
Phiinv, vrank = sc.pinv(vnormalized,return_rank=True)
KMM = Phiinv@(self.PsiX.T)
print(f'Computed Kmodes {KMM.shape} for {vv.shape} normalized eigenfunctions')
print(f'Computed Kmodes {Nmodes} {Nfeatures} vv {Ntimes} {Nmodes} eigenfunctions')
print(f'Rank of V matrix {vrank}')
# Create Data Sets for Koopman modes
ds = xr.Dataset(
data_vars=dict(
kmodes=(["x", "modes"], KMM.T),
eigfun=(["time", "modes"], vnormalized),
periods=(["modes"], 2*math.pi/np.log(ww).imag/12),
eigval=(["modes"], ww ) ),
coords=dict(
feature=(["x"], np.arange(Nfeatures)),
time=(["time"], self.time_data),
modes=(["modes"], np.arange(Nmodes) ) ),
attrs=dict(type=modetype, description=description) )
return ds
[docs]
def reconstruct_koopman(self,ds,decode=None,select_modes=None):
'''
Reconstruction of data from Koopman modes
Parameters
----------
ds:
Koopman decomposition from `compute_modes`
decode:
Array decoding features
decode:
If not empty contains the array for decoding
to physical space
select_modes: list
If not empty reconstrut using only selected modes
[2,7,4]
Returns
-------
data : Array
Reconstructed data
'''
if select_modes is None:
y = ds.eigfun@ds.kmodes
else:
y = ds.eigfun.isel(modes=select_modes)@ds.kmodes.isel(modes=select_modes)
#Check Reality
if np.sum(y.imag**2) > 1.e-14:
raise ValueError(f'reconstruct_koopman: --- Array not Real {np.sum(y.imag**2)}')
else:
y = y.real
if decode is None:
return y
else:
return decode@y.data.T
[docs]
def find_norm_modes(self,KM, KE, KEI, sort='sort',decode=None,simplify_conjugates=True):
'''
Compute norms of eigenfunctions
eliminate complex conjugates, optionally sort in order
of magnitude
Parameters
----------
KM:
Koopman Modes (KM)
KE:
Koopman Eigenfunctions (KE)
KEI:
Koopman EIgenvalues (KEI)
sort:
'sort' -- get also sorted eigenvector in order of magnitude
decode:
None -- No decoding is applied, otherwise use array `decode`
simplify_conjugates:
If `True` only modes with imaginary part >= 0 are retained
Returns
-------
(vnorm, KM, VM, KEI):
Koopman Modes Norms, Koopman Modes, Eigenfunctions, Eigenvalues
(vnormsort, KM_sort, VM_sort, KEI_sort):
Sorted Koopman Modes Norms, Koopman Modes, Eigenfunctions, Eigenvalues
'''
# Eliminate complex conjugates
if simplify_conjugates:
print(f'\n `find_norm_modes` Retain only positive frequencies or zero \n')
ivar = np.where(KEI.imag >= 0)
KM_single = KM[:,ivar[0]]
VM_single = KE[:,ivar[0]]
wti = KEI[ivar[0]]
else:
KM_single = KM[:,:]
VM_single = KE[:,:]
wti = KEI[:]
if decode is None:
KM_out = KM_single.data
else:
KM_out = decode@KM_single.data
vvar = np.diag(KM_single.T.conj().data@KM_single.data).real
vnorm = np.sqrt(vvar)
if sort == 'sort':
# Sort in order of magnitude
indnorm = np.argsort(vnorm)[::-1]
return (vnorm,KM_out, VM_single, wti), (indnorm, vnorm[indnorm],KM_out[:,indnorm], VM_single[:,indnorm], wti[indnorm])
else:
return (vnorm,KM_out, VM_single, wti)
[docs]
def eigenfunction_value( self, x, mode ):
'''
Compute eigenfunction value at point x
Parameters
----------
x:
Point at which eigenfunction is computed
mode:
Eigenfunction number
Returns
-------
eigenfunction value
'''
k = self.ker
f = 0
for i in range(self.ntime):
f += self.cc[i,mode]*k(x, self.PsiX[:, i])
return f
[docs]
class KoopGen():
""" This class creates the Generator Koopman operator
The input array is supposed to be in features/samples form
The augemnted matrix is created inside
Parameters
----------
vdat: array
Data array with features and samples
**kwargs : dict
* 'operator' Choice of operator `Koopman` or `Perron`
* 'deltat' Time interval between samples
* 'kernel_choice' Kernel choice , `gauss`,`poly`
* 'sigma' Bandwidth for Gaussian kernel
* 'std' -- Standard Deviation of distances
* 'median -- Median of distances
* '100' -- Explicit value
* 'epsilon' Regularization parameter
* 'maxeig' Max number of eigenvalues to compute
* 'poly_order': 1 Order of polynomial kernel
* 'poly_shift':0 Shift of polynomial kernel
Attributes
----------
PsiX : numpy array
Reduced data matrix of type *array*
PsiY : numpy array
Shifted data matrix of type *array*
ntime : int
Number of samples or time points
npoints : int
Number of features or spatial points
deltat : float
Time interval between samples
operator: str
Operator chosen {'Koopman', 'Perron'}
kernel_choice: object
Kernel used in the estimator
epsilon : float
Tikhonov Regularization parameter {1e-5}
sigma : float
Bandwidth for Gaussian kernel
maxeig : int
Maximum number of eigenvalues to compute
poly_order: int
Order polynomial kernel
poly_shift: float
Shift polynomial kernel
ww: array
Koopman eigenvalues
wwg: array
Koopman generator eigenvalues
vv: array
Koopman eigenfunction values on the samples
wf: array
Filtered Koopman eigenvalues
wfg: array
Filtered Koopman generator eigenvalues
vvf: array
Filtered Koopman eigenfunction values on the samples
Gxx: array
Kernel Matrix Gxx
Gxy: array
Kernel Matrix Gxy
ker: object
Object Kernel used
Examples
--------
Create a Koopman operator
>>> K = Koop(X, {operator: 'koopman',
kernel_choice : 'gauss', sigma : 42, epsilon : 1e-5})
"""
__slots__ = ('PsiX','PsiY','ntime','npoints', 'operator', 'deltat',\
'kernel_choice','epsilon','sigma','maxeig','bandwidth',\
'poly_order','poly_shift', 'k_shift',\
'wf','wfg','vvf',\
'ww','wwg','vv','Gxx','Gxy','ker')
def __init__(
self,
vdat,
**kwargs ):
#Set default arguments
defaultOptions = {'operator': 'Koopman', 'deltat':1,
'kernel_choice' : 'gauss', 'bandwidth' : 'std', 'epsilon' : 1e-5, 'maxeig':2000,
'poly_order': 1,'poly_shift':0,'k_shift':0.0}
options ={**defaultOptions, **kwargs}
self.PsiX = vdat[:,:-1]
'''Reduced Data matrix of type (`array`)'''
self.PsiY = vdat[:,1:]
'''Shifted data matrix of type (`array`)'''
self.ntime = self.PsiX.shape[1]
'''Number of samples or time points (`int`)'''
self.npoints = self.PsiX.shape[0]
'''Number of features or spatial points (`int`)'''
self.deltat = 1
'''Time interval between samples (`float`)'''
self.bandwidth = None
'''Option Bandwidth for Gaussian kernel (`str`)'''
self.sigma = None
'''Bandwidth for Gaussian kernel (`float`)'''
self.operator = options['operator']
'''Operator chosen (`str`) {'Koopman', 'Perron'}'''
self.kernel_choice = options['kernel_choice']
'''Kernel used in the estimator (`str`)'''
self.epsilon = options['epsilon']
'''Tikhonov Regularization parameter (`float`) {1e-5}'''
self.maxeig = options['maxeig']
'''Maximum number of eigenvalues to compute (`int`)'''
self.poly_order = options['poly_order']
'''Order polynomial kernel (`int`)'''
self.poly_shift = options['poly_shift']
'''Shift polynomial kernel (`float`)'''
self.k_shift = options['k_shift']
'''Shift kernel (`float`)'''
print(f'Created {self.operator} Estimator \n for {self.ntime} samples and {self.npoints} features, \
{self.deltat} time interval' )
self.wf = None
'''Filtered Koopman eigenvalues (`array`)'''
self.wfg = None
'''Filtered Koopman generator eigenvalues (`array`)'''
self.vvf = None
'''Filtered Koopman eigenfunction values on the samples (`array`)'''
self.ww = None
'''Koopman eigenvalues (`array`)'''
self.wwg = None
'''Koopman generator eigenvalues (`array`)'''
self.vv = None
'''Koopman eigenfunction values on the samples (`array`)'''
self.Gxx = None
'''Kernel Matrix Gxx (`array`)'''
self.Gxy = None
'''Kernel Matrix Gxy (`array`)'''
self.ker = None
'''Kernel used (`object`)'''
def __call__(self, v ):
''' Not implemented'''
print('Not implemented')
return
def __repr__(self):
''' Printing Information '''
print(f'Koopman Estimator for operator {self.operator} with paramenters:\n \
Kernel_choice {self.kernel_choice}\n \
Maxeig {self.maxeig}\n \
Epsilon {self.epsilon} \n ')
if self.kernel_choice == 'poly':
print(f' Poly_Order {self.poly_order} \n \
Poly_Shift {self.poly_shift}\n ')
elif self.kernel_choice == 'gauss':
print(f' Bandwidth {self.sigma}\n \
')
return '\n'
[docs]
def fit(self,bandwidth=None,derivative=None):
'''
Fitting generator estimator.
Select bandwidth with `bandwidth`
Parameters
----------
bandwidth: string
Select the bandiwidth for the Gaussian kernel
* 'std' Use bandwidth that make the distances of std 1
* 'median' Use median value of distances
* '100' Use this explicit numerical value
derivative: array
Derivative of the data to be used in the generator.
If not provided, the derivative is computed. (not implemented)
Returns
-------
ww:
Eigenvalues
self.wwg = np.log(ww_tot)/self.deltat:
Generator Eigenvalues
self.vv:
Value of Eigenfunctions at samples
self.Gxx:
kernel matrix Gxx
self.Gxy:
kernel matrix Gxy
self.ker:
Kernel used
'''
if self.operator == 'Perron':
ops='P'
elif self.operator =='Koopman':
ops='K'
else:
raise ValueError('TRANSFER_OP_GAUSS: --- Wrong Operator choice ')
print(' Calculating {} Operator '.format(self.operator))
self.bandwidth = bandwidth
if bandwidth is None or bandwidth == 'std':
similarity=distance.squareform(distance.pdist(self.PsiX.T ,'sqeuclidean'))
scale=np.std(similarity.flatten())
elif bandwidth == 'median':
similarity=distance.squareform(distance.pdist(self.PsiX.T ,'sqeuclidean'))
scale = np.median(similarity.flatten())
else:
scale = 2*(float(bandwidth))**2
self.sigma = np.sqrt(scale/2)
print(f'Using bandwidth {bandwidth} for a scale of {scale} and sigma {self.sigma}\n')
if self.kernel_choice == 'gauss':
k = kernels.gaussianKernel(self.sigma)
elif self.kernel_choice == 'poly':
k = kernels.polynomialKernel(p=self.poly_order,c=self.poly_shift)
elif self.kernel_choice == 'gaussShifted':
k = kernels.gaussianKernelShifted(self.sigma, shift=self.k_shift)
else:
raise ValueError(f'KK_fit: --- WrongKernel choice {self.kernel_choice}')
if derivative is not None:
self.PsiY = derivative
else:
raise ValueError(f'KK_fitgen: Provide value for Derivative {derivative}')
ww_tot,vv_tot,A,Gxx,Gxy =al.kgedmd(self.PsiX,self.PsiY, k, \
epsilon=self.epsilon, evs=self.maxeig, operator=ops,kind='kernel')
print(f'Computed Transfer Eigenvalues')
self.ww = np.exp(ww_tot*self.deltat)
self.wwg = ww_tot
self.vv = vv_tot
self.Gxx = Gxx
self.Gxy = Gxy
self.ker = k
return
[docs]
def order(self,choice='frequency',direction='down',cut_period=3):
'''
Order Eigenvalues according to `option`
Parameters
----------
option:
* 'magnitude' abs(w)
* 'frequency' w.imag
* 'growth' log(w).real/dt
* 'ones' abs(w) closest to 1.0
* 'stable' log(w).real/dt irrespextive of sign
direction:
* 'up' descending
* 'down' ascending
cut_period:
Eliminate modes with shorter periods (sample units)
Returns
-------
wf:
Ordered Koopman Eigenvalues
wfg:
Ordered generator eigenvalues
vvf:
Ordered Koopman eigenfunctions
'''
# cut period in (months)
wmax = np.max(np.where(2*math.pi/self.wwg.imag > cut_period))
_wf =self.ww[0:wmax]
_vvf = self.vv[:,0:wmax]
print(f'Eliminating frequency higher than {cut_period} months, \
original modes {len(self.ww)}, remaining {len(_wf)}')
wf,wfg, indf = order_w(_wf,option=choice,direction=direction,tdelta=self.deltat)
self.wf = wf
self.wfg = wfg
self.vvf = _vvf[:,indf]
return wf, wfg, self.vvf
[docs]
def eigzero(self,ww,vv,tol=0.1,option='zero'):
'''
Choose stable eigenvalues, either according their absolute value
or according the sign of the real part
Parameters
----------
ww: complex
Koopman Generator Eigenvalues
vv: array complex
Koopman eigenfunctions
select:
* `zero` Eigenvalues close to zero real part less than `tol`
* `stable` Real part < 0
* `unstable` Real part > 0
tol:
Tolerance
Returns
-------
ww:
Selected eigenvalues
vv:
Selected eigenfunctions
ind:
Index of selected eigenvalues
'''
print(f'Keeping only EI close to zero less than tol {tol}')
if option == 'zero':
indone = np.where(np.isclose(ww.real,0,atol=tol))[0]
elif option == 'stable':
indone = np.where(ww.real < 0 )[0]
elif option == 'unstable':
indone = np.where(ww.real > 0 )[0]
else:
raise SystemExit(f'Wrong option {option} in Choose_eig')
N1 = len(indone)
wone = ww[indone]
print(f'\n Trace of generator eigenvalues close to unity according to tolerance {tol:.5f} ---> {sum(wone):.6f}')
print(f' Factor of Phase Volume contraction {sum(abs(np.exp(wone)))/len(wone):.6f} per month\n')
# Order retained unitary eigenvalues and obtain generator eigenvalues 'expwr'
print(f'Number of Eigenvalues found {N1} over {len(ww)} for kmode tol {tol}\n')
# _,expwr,indexpwr = order_w(wone,option='stable',direction='down',tdelta=1)
headr = ('Mode','Eig-Real','Eig-Imag','Gen Real (Yr)','Gen Imag (Yr)')
kaz_data = [(i,wone[i].real,wone[i].imag, 1./wone[i].real/12, 2*math.pi/wone[i].imag/12) for i in range(min(len(wone),50))]
print(tab.tabulate(kaz_data, headr,stralign='center',tablefmt='github'))
_vv = vv[:,indone]
return wone,_vv,indone
[docs]
def compute_modes(self,ww,vv,modetype='',description=''):
'''
Compute Koopman Modes
Parameters
----------
ww:
Koopman eigenvalues
vv:
Value of Koopman EIgenfunctions at samples
modetype:
Descriptive label
description:
Data Descritpion
Returns
-------
ds : data set
.. table:: Content for `ds`
:widths: auto
====================== ============================================
Variable Description
====================== ============================================
keig(time,modes) Koopman eignfunction values at sample points
kmodes(features,modes) Koopman modes
periods Periods of Koopman modes
eigenvalues Koopman Eigenvalues
====================== ============================================
'''
Nmodes = vv.shape[1]
Ntimes = self.ntime
Nfeatures = self.npoints
# The ordering is consistent with Mezic
Phiinv= sc.pinv(vv)
KMM = Phiinv@(self.PsiX.T)
print(f'Computed Kmodes {KMM.shape} for {vv.shape} eigenfunctions')
# print(f'Computed KMM {Nmodes} {Nfeatures} vv {Ntimes} {Nmodes} eigenfunctions')
# Create Data Sets for Koopman modes
ds = xr.Dataset(
data_vars=dict(
kmodes=(["x", "modes"], KMM.T),
eigfun=(["time", "modes"], vv),
periods=(["modes"], 2*math.pi/ww.imag/12),
eigval=(["modes"], ww ) ),
coords=dict(
feature=(["x"], np.arange(Nfeatures)),
time=(["time"], np.arange(Ntimes)),
modes=(["modes"], np.arange(Nmodes) ) ),
attrs=dict(type=modetype, description=description) )
return ds
[docs]
def reconstruct_koopman(self,ds,decode=None,select_modes=None):
'''
Reconstruction of data from Koopman modes
Parameters
----------
ds:
data set from `compute_modes`
decode:
Array decoding features
decode:
If not empty contains the array for decoding
to physical space
select_modes: list
If not empty reconstrut using only selected modes
[2,7,4]
Returns
-------
data : Array
Reconstructed data
'''
if select_modes is None:
y = ds.eigfun@ds.kmodes
else:
y = ds.eigfun[:,select_modes]@ds.kmodes[:,select_modes]
#Check Reality
if np.sum(y.imag**2) > 1.e-14:
raise ValueError('reconstruct_koopman: --- Array not Real ')
else:
y = y.real
if decode is None:
return y
else:
return decode@y.data.T
[docs]
def find_norm_modes(self,KM, KE, KEI, sort='sort',decode=None):
'''
Compute norms of eigenfunctions
eliminate complex conjugates, optionally sort in order
of magnitude
Parameters
----------
KM:
Koopman Modes (KM)
KE:
Koopman Eigenfunctions (KE)
KEI:
Koopman EIgenvalues (KEI)
sort:
'sort' -- get also sorted eigenvector in order of magnitude
decode:
None -- No decoding is applied, otherwise use array `decode`
Returns
-------
(vnorm, KM, VM, KEI):
Norms, Koopman Modes, Eigenfunctions, Eigenvalues
(vnormsort, KM_sort, VM_sort, KEI_sort):
Sorted Norms, Koopman Modes, Eigenfunctions, Eigenvalues
'''
# Eliminate complex conjugates
ivar = np.where(KEI.imag >= 0)
KM_single = KM[:,ivar[0]]
VM_single = KE[:,ivar[0]]
wti = KEI[ivar[0]]
if decode is None:
KM_out = KM_single.data
else:
KM_out = decode@KM_single.data
if sort == 'sort':
vvar = np.diag(KM_single.T.conj().data@KM_single.data).real
vnorm = vvar
indnorm = np.argsort(vnorm)[::-1]
return (vnorm,KM_out, VM_single, wti), (vnorm[indnorm],KM_out[:,indnorm], VM_single[:,indnorm], wti[indnorm])
else:
return (vnorm,KM_out, VM_single, wti)
[docs]
class KEig():
'''
Class for computing eigenfunctions values from coefficients
'''
__slots__ = ('k','X','n','m', 'v', 'nv','max','min','d')
def __init__(self, k, X, v, nv):
'''
Constructor for the eigenfunctionc objects
'''
self.k = k
'''Kernel '''
self.X = X
'''Data matrix containing snapshots'''
self.v = v
'''Coefficients of eigenfunction'''
self.nv = nv
'''Order of the Eigenfunction'''
self.n = X.shape[0]
'''Dimension of state space'''
self.m = X.shape[1]
'''Number of snapshots'''
self.max = np.amax(v[:,nv] )
'''Max of eigenfunction values'''
self.min = np.amin(v[:,nv] )
'''Min of eigenfunction values'''
self.d = self.max-self.min
'''Distance of eigenfunction'''
print(' Max {} Min {}'.format(self.max,self.min))
def __call__(self, x):
''' Function evaluation.'''
f = 0
for i in range(self.m):
f += self.v[i,self.nv]*self.k(x, self.X[:, i])
return f
def __repr__(self):
''' Printing Information '''
print(' \n Eigenfunction for {} \n'.format(self.k))
print(' Number of Eigenfunction N={} \n'.format(self.nv))
print(' Max={}, Min={} \n'.format(self.max,self.min))
return '\n'
[docs]
def fdfeval(self, x):
'''Function and gradient evaluation.'''
f = 0
df = np.zeros((self.d,))
for i in range(self.n):
fi = self.v[i]*self.k(x, self.X[:, i])
f += fi
df -= 2/(2*self.k.sigma**2)*(x - self.X[:, i])*fi
return f, df
[docs]
def xmax(self, fac):
'''
Extract States within a certain factor from Max
Inputs:
fac Distance from Max
Return:
test the average state
indv the indices of the states averaged
'''
indb = self.v > self.max-self.d*fac
indv =np.where(indb)[0]
if len(indv) == 0:
print("Error in xmax, no index found for value {}".format(fac))
return
else:
test=np.mean(self.X[:,indb],axis=1)
print('Indices of states for Max {}'.format(indv))
return test,indv
[docs]
def xmin(self, fac):
'''
Extract States within a certain factor from Min
Inputs:
fac Distance from Max
Return:
test the average state
indv the indices of the states averaged
'''
indb = self.v < self.min+self.d*fac
indv =np.where(indb)[0]
if len(indv) == 0:
print("Error in xmin, no index found for value {}".format(fac))
return
else:
test=np.mean(self.X[:,indb],axis=1)
print('Indices of states for Min {}'.format(indv))
return test,indv
[docs]
def xclose(self, val,tol=0.001):
'''
Extract States close a certain factor from max
Return:
test the average state
indv the indices of the states averaged
'''
indb=abs(self.Gv - val) < tol
indv =np.where(indb)[0]
if len(indv) == 0:
print("Error in xmin, no index found close to value {}".format(val))
return
else:
test=np.mean(self.X[:,indb],axis=1)
print('Indices of states {} close to {}'.format(indv,val))
return test,indv