'''
Classes for the AI project
Classes
=======
Field:
Class for fields
EarlyStopping:
Class for early stopping
TimeSeriesDataset:
Class for time series dataset
'''
import os,sys
import math
import numpy as np
import xarray as xr
import pandas as pd
import scipy.linalg as sc
import matplotlib.pyplot as plt
# import datetime
import time as tm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import transformers as tr
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.base import BaseEstimator, TransformerMixin
import zapata.computation as zcom
import zapata.data as zd
import zapata.lib as zlib
import zapata.mapping as zmap
import zapata.koopman as zkop
[docs]
class Field():
'''
Class for fields
Parameters
==========
name: string
Name of the field
levels: string
Level of the field
area: string
Area to be analyzed, possible values are
* 'TROPIC': Tropics
* 'GLOBAL': Global
* 'PACTROPIC': Pacific Tropics
* 'WORLD': World
* 'EUROPE': Europe
* 'NORTH_AMERICA': North America
* 'NH-ML': Northern Hemisphere Mid-Latitudes
mr: float
Number of EOF retained
dstart: string
Start date for field
dtend: string
End date for field
Attributes
==========
name: string
Name of the field
levels: string
Level of the field
area: string
Area of the field
mr: float
Number of EOF retained
dstart: string
Start date for field
dtend: string
End date for field
'''
def __init__(self, name, levels,area, mr, dstart='1/1/1940', dtend='12/31/2022',dropX=False):
self.name = name
self.levels = levels
self.mr = mr
self.area = area
self.dstart = dstart
self.dtend = dtend
self.dropX = dropX
def __call__(self):
return self.name,self.levels
def __repr__(self):
''' Printing Information '''
print(f'Field: {self.name}, Levels: {self.levels}, Area: {self.area}, DropX: {self.dropX}')
print(f'EOF Retained: {self.mr}, StartDate: {self.dstart}, EndDate: {self.dtend}')
return '\n'
# Early stopping class
[docs]
class EarlyStopping:
'''
Class for early stopping
Parameters
==========
patience: int
Number of epochs to wait before stopping
verbose: boolean
If True, print the epoch when stopping
delta: float
Minimum change in loss to be considered an improvement
Attributes
==========
patience: int
Number of epochs to wait before stopping
verbose: boolean
If True, print the epoch when stopping
delta: float
Minimum change in loss to be considered an improvement
counter: int
Number of epochs since last improvement
best_score: float
Best loss score
early_stop: boolean
If True, stop the training
'''
def __init__(self, patience=5, verbose=False, delta=0):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.delta = delta
def __call__(self, val_loss):
if self.best_score is None:
self.best_score = val_loss
return False
elif val_loss > self.best_score + self.delta:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = val_loss
self.counter = 0
return self.early_stop
[docs]
class TimeSeriesDataset(Dataset):
'''
Class for time series dataset.
Includes time feature for transformers
Parameters
==========
datasrc: numpy array
Source data
datatgt: numpy array
Target data
TIN: int
Input time steps
MIN: int
Input variables size
T: int
Predictions time steps
K: int
Output variables size
time_features: numpy array (optional)
If not `None` contain Time Features
shift:
Overlap between source and target, for trasnformers
`overlap = 0` for LSTM `overlap` should be TIN-T
Attributes
==========
datasrc: numpy array
Source data
datatgt: numpy array
Target data
time_features: numpy array
Time features
TIN: int
Input time steps
MIN: int
Input variables
T: int
Output time steps
K: int
Output variables
'''
def __init__(self, datasrc, datatgt, TIN, MIN, T, K, time_features=None):
self.datasrc = datasrc
self.datatgt = datatgt
self.TIN = TIN
self.MIN = MIN
self.T = T
self.K = K
self.time_features = time_features
def __len__(self):
return len(self.datasrc) - self.TIN - self.T + 1
def __getitem__(self, idx):
input_seq = self.datasrc[idx:idx+self.TIN, :self.MIN]
target_seq = self.datatgt[idx+self.TIN:idx+self.TIN+self.T, :self.K]
pasft = self.time_features[idx:idx+self.TIN,:]
futft = self.time_features[idx+self.TIN:idx+self.TIN+self.T,:]
return input_seq, target_seq, pasft, futft
[docs]
class PositionalEmbedding(nn.Module):
def __init__(self, T, neof, embedding_dim):
"""
Initialize positional embedding module.
Parameters:
- T (int): Maximum sequence length (time dimension).
- neof (int): Feature space dimension of input.
- embedding_dim (int): Dimensionality of the positional embedding for each axis.
"""
super(PositionalEmbedding, self).__init__()
# Learnable embeddings for temporal and feature positions
self.time_embedding = nn.Embedding(T, embedding_dim)
self.feature_embedding = nn.Embedding(neof, embedding_dim)
# Linear layer projecting embedding_dim to 1 scalar per position
self.linear = nn.Linear(embedding_dim, 1)
[docs]
def forward(self, X):
"""
Apply positional embeddings to the input sequence.
Parameters:
- X (Tensor): Input tensor of shape (batch_size, T, neof)
Returns:
- Tensor: Positional embedding applied tensor of shape (batch_size, T, neof)
"""
batch_size, T, neof = X.size()
# Create time and feature position indices
time_positions = torch.arange(T, device=X.device).unsqueeze(0).repeat(batch_size, 1)
# (batch_size, T)
feature_positions = torch.arange(neof, device=X.device).unsqueeze(0).repeat(batch_size, 1)
# (batch_size, neof)
# Get embeddings for time and feature positions
time_pos_emb = self.time_embedding(time_positions) # (batch_size, T, embedding_dim)
feature_pos_emb = self.feature_embedding(feature_positions) # (batch_size, neof, embedding_dim)
# Combine positional embeddings across both axes
# Expand dims so we can broadcast: (T, 1) + (1, neof) => (T, neof)
time_pos_emb = time_pos_emb.unsqueeze(-2) # (batch_size, T, 1, embedding_dim)
feature_pos_emb = feature_pos_emb.unsqueeze(-3) # (batch_size, 1, neof, embedding_dim)
combined_pos_emb = time_pos_emb + feature_pos_emb # (batch_size, T, neof, embedding_dim)
# Project embedding_dim -> 1, then remove that last dimension
combined_pos_emb = self.linear(combined_pos_emb).squeeze(-1) # (batch_size, T, neof)
# Add positional embeddings to input
X_with_pos = X + combined_pos_emb # (batch_size, T, neof)
return X_with_pos
[docs]
class FixedPositionalEmbedding(nn.Module):
"""
Creates fixed (non-learnable) positional embeddings for
both the time dimension (T) and feature dimension (neof).
"""
def __init__(self, max_T, max_neof, embedding_dim=64):
super().__init__()
# 1) Create zero tensors for time and feature embeddings
time_pe = torch.zeros(max_T, embedding_dim) # (max_T, embedding_dim)
feature_pe = torch.zeros(max_neof, embedding_dim) # (max_neof, embedding_dim)
# 2) Prepare position indices for time and features
position_t = torch.arange(0, max_T, dtype=torch.float).unsqueeze(1) # (max_T, 1)
position_f = torch.arange(0, max_neof, dtype=torch.float).unsqueeze(1) # (max_neof, 1)
# 3) Compute the divisor terms for the sine-cosine formula
# The standard Transformer approach uses exponentials of the form:
# exp( - (2*i)/embedding_dim * ln(10000) )
# Here, we do a similar but slightly different approach (still valid).
div_term_t = torch.exp(
torch.arange(0, embedding_dim, 2).float()
* (-math.log(10000.0) / embedding_dim)
)
div_term_f = torch.exp(
torch.arange(0, embedding_dim, 2).float()
* (-math.log(10000.0) / embedding_dim)
)
# 4) Fill time_pe with sine on even dims, cos on odd dims
# shape: (max_T, embedding_dim)
# position_t * div_term_t -> broadcast: (max_T, 1) * (num_even_dims) => (max_T, num_even_dims)
time_pe[:, 0::2] = torch.sin(position_t * div_term_t) # Even indices
time_pe[:, 1::2] = torch.cos(position_t * div_term_t) # Odd indices
# 5) Fill feature_pe with sine on even dims, cos on odd dims
feature_pe[:, 0::2] = torch.sin(position_f * div_term_f)
feature_pe[:, 1::2] = torch.cos(position_f * div_term_f)
# 6) Register them as buffers => not learnable but move with .to(device)
self.register_buffer("time_pe", time_pe) # (max_T, embedding_dim)
self.register_buffer("feature_pe", feature_pe) # (max_neof, embedding_dim)
# 7) Linear layer to map from embedding_dim -> 1 offset per (t,feature)
# (If you wanted an offset of size 'neof', you'd do nn.Linear(embedding_dim, neof).)
self.linear = nn.Linear(embedding_dim, 1)
[docs]
def forward(self, X):
"""
X: shape (batch_size, T, neof)
Returns: shape (batch_size, T, neof)
with a fixed (non-learnable) sinusoidal offset added.
"""
batch_size, T, neof = X.shape
# (A) Slice the needed portion of time_pe: (T, embedding_dim) -> (1, T, 1, embedding_dim)
time_pos_emb = self.time_pe[:T].unsqueeze(0).unsqueeze(2)
# (B) Slice the needed portion of feature_pe: (neof, embedding_dim) -> (1, 1, neof, embedding_dim)
feature_pos_emb = self.feature_pe[:neof].unsqueeze(0).unsqueeze(1)
# (C) Combine via addition, broadcasting: (1, T, neof, embedding_dim)
combined_pos_emb = time_pos_emb + feature_pos_emb
# (D) Map embedding_dim -> 1 to get a scalar offset per (t, feature): shape => (1, T, neof, 1) -> squeeze => (1, T, neof)
combined_pos_emb = self.linear(combined_pos_emb).squeeze(-1)
# (E) Broadcast over batch dimension: (batch_size, T, neof)
X_with_pos = X + combined_pos_emb
return X_with_pos
[docs]
class FeaturePositionalEmbedding(nn.Module):
"""
Creates fixed (non-learnable) positional embeddings for the feature dimension only.
Given an input (batch_size, T, neof), we produce a feature-wise offset
of shape (1, 1, neof), then broadcast-add it to the input.
"""
def __init__(self, max_neof, embedding_dim=64):
"""
Args:
max_neof (int): maximum feature dimension for which to create embeddings
embedding_dim (int): dimensionality of the sine-cosine embedding
"""
super().__init__()
# Precompute sine-cosine embeddings for [0..max_neof - 1]
feature_pe = torch.zeros(max_neof, embedding_dim) # (max_neof, embedding_dim)
position_f = torch.arange(0, max_neof, dtype=torch.float).unsqueeze(1) # (max_neof, 1)
# Exponential spacing of frequencies, as in the Transformer paper
div_term = torch.exp(
torch.arange(0, embedding_dim, 2).float() * (-math.log(10000.0) / embedding_dim)
)
# Apply sine to even indices, cosine to odd indices
feature_pe[:, 0::2] = torch.sin(position_f * div_term)
feature_pe[:, 1::2] = torch.cos(position_f * div_term)
# Register as a buffer (not a parameter)
self.register_buffer("feature_pe", feature_pe) # (max_neof, embedding_dim)
# Linear layer to map embedding_dim -> 1 scalar per feature position
# If you prefer a neof-dimensional offset, use nn.Linear(embedding_dim, neof).
self.linear = nn.Linear(embedding_dim, 1)
[docs]
def forward(self, X):
"""
Args:
X: Tensor of shape (batch_size, T, neof)
Returns:
Tensor of the same shape, with a feature-wise positional offset added.
"""
batch_size, T, neof = X.shape
# Slice the needed portion of feature_pe: shape => (neof, embedding_dim)
# Then reshape to (1, 1, neof, embedding_dim) for broadcast
feature_pos_emb = self.feature_pe[:neof].unsqueeze(0).unsqueeze(0)
# (1, 1, neof, embedding_dim)
# Map (embedding_dim) -> 1 scalar per feature
# Result shape => (1, 1, neof, 1) => squeeze => (1, 1, neof)
feature_pos_emb = self.linear(feature_pos_emb).squeeze(-1)
# (1, 1, neof)
# Broadcast-add to X, which is (batch_size, T, neof)
# feature_pos_emb expands over batch_size and T
X_with_pos = X + feature_pos_emb
return X_with_pos
[docs]
class SymmetricFeatureScaler(BaseEstimator, TransformerMixin):
"""
Scales each feature to a symmetric range [-scale, +scale] around zero,
following the scikit-learn transformer API.
For feature i, the data's minimum is mapped to -feature_scales[i],
and the data's maximum is mapped to +feature_scales[i].
Parameters
----------
feature_scales : array-like of shape (n_features,), optional (default=None)
If None, all features are scaled to [-1, +1].
Otherwise, each feature i is scaled to [-feature_scales[i], +feature_scales[i]].
Attributes
----------
data_min_ : ndarray of shape (n_features,)
Per-feature minimum seen in the data during fit.
data_max_ : ndarray of shape (n_features,)
Per-feature maximum seen in the data during fit.
n_features_ : int
Number of features in the fitted data.
feature_scales_ : ndarray of shape (n_features,)
Final validated array of per-feature scales.
Example
-------
>>> import numpy as np
>>> X = np.array([[1, 10], [2, 20], [3, 30]], dtype=float)
>>> # Suppose we want feature 0 scaled to [-1, +1] and feature 1 to [-5, +5]
>>> feature_scales = [1.0, 5.0]
>>> scaler = SymmetricFeatureScaler(feature_scales=feature_scales)
>>> scaler.fit(X)
SymmetricFeatureScaler(...)
>>> X_scaled = scaler.transform(X)
>>> X_scaled
array([[-1. , -5. ],
[ 0. , 0. ],
[ 1. , 5. ]])
>>> X_orig = scaler.inverse_transform(X_scaled)
>>> X_orig
array([[ 1., 10.],
[ 2., 20.],
[ 3., 30.]])
"""
def __init__(self, feature_scales=None):
self.feature_scales = feature_scales
[docs]
def fit(self, X, y=None):
"""Learn the per-feature min and max from the training data."""
X = self._check_array(X)
self.n_features_ = X.shape[1]
# Compute data min and max per feature
self.data_min_ = X.min(axis=0)
self.data_max_ = X.max(axis=0)
# If feature_scales is None, default each feature to 1.0 => [-1, +1]
if self.feature_scales is None:
self.feature_scales_ = np.ones(self.n_features_, dtype=float)
else:
self.feature_scales_ = np.array(self.feature_scales, dtype=float)
if self.feature_scales_.shape != (self.n_features_,):
raise ValueError(
f"`feature_scales` must be of shape (n_features,), "
f"but got {self.feature_scales_.shape}."
)
return self
def _check_array(self, X):
"""Ensure X is at least 2D."""
X = np.array(X, dtype=float)
if X.ndim == 1:
X = X.reshape(-1, 1)
return X
def _check_is_fitted(self):
"""Verify that fit has been called by checking learned attributes."""
if not hasattr(self, 'data_min_') or not hasattr(self, 'data_max_'):
raise AttributeError(
"This SymmetricFeatureScaler instance is not fitted yet. "
"Call 'fit' with appropriate arguments before using this method."
)
[docs]
class IdentityScaler(BaseEstimator, TransformerMixin):
"""
A no-op (identity) scaler that complies with scikit-learn's
estimator API. It leaves data unchanged.
"""
[docs]
def fit(self, X, y=None):
# Nothing to compute
return self