Interpolation for Ocean and Atmospheric Data
Routines and methods for interpolation of ocean and atmospheric model
field, either on regular grids or rotated, multi-pole grids.
For the ocean, is set for the CMCC Ocean model at the nominal resolution of 0.25
The staggering requires different interpolators operators for scalar points at T-points
and vector quantities carried at (u,v) points. For the moment a simple interpolation is carried out
but a more accurate vector interpolation is under development.
The interpolation is obtained by triangulation of the starting grid and seaprate interpolation to the new grid.
The weights are preserved and they can be used for repeated application of the same set of grids.
| **Atmosphere_Interpolator**
| **Ocean_Interpolator**
import os
import math
import numpy as np
import pickle
import gzip
import scipy.linalg as sc
import scipy.special as sp
import scipy.interpolate as spint
import scipy.spatial.qhull as qhull
from scipy.spatial import Delaunay
import zapata.lib as zlib
import xarray as xr
class Atmosphere_Interpolator():
This class creates weights for interpolation of atmospheric fields.
No mask is used.
This class create an interpolator operator to a *target grid* `tgt_grd`. The interpolator then
can be used to perform the actual interpolation.
The *target grid* must be a `xarray` `DataArray` with variables `lat` and `lon`
grid : str
Choice of output grids
* `1x1` -- Regular 1 degree
* `025x025` -- Coupled model grid, nominally 0.25,
option : str
'linear', interpolation method
name : str
Name of the interpolator
grid : str
Option for the grid
option : str
Interpolation method
It is a thin wrapper around `xarray` `interp_like` method.
Create the weights for interpolation
>>> w= zint.Atmosphere_Interpolator('1x1','linear')
Interpolate temperature
>>> target_xarray=w.interp_f(src_xarray)
__slots__ = ('name','tgt','choice')
def __init__(self, grid, option='linear'):
self.choice = option
''' str: Interpolation method selected `linear` or `nearest`'''
# Put here info on grids to be obtained from __call__
self.name = 'Atmosphere_Interpolator'
'''str: Name of the Interpolator'''
if grid == '1x1':
# Selected regular 1 Degree grid
lon1x1 = np.linspace(0,359,360)
lat1x1 = np.linspace(-90,90,180)
tgt = xr.DataArray(mm,dims=['lat','lon'],\
elif grid == '025x025':
homedir = os.path.expanduser("~")
file = homedir + '/Dropbox (CMCC)/data_zapata/'+ 'masks_CMCC-CM2_VHR4_AGCM.nc'
lat25 = dst.yc.data[:,0]
lon25 = dst.xc.data[0,:]
tgt = xr.DataArray(dst.mask,dims=['lat','lon'],\
SystemError(f'Wrong Option in {self.name} --> {grid}')
self.tgt = tgt
'''xarray: Target grid'''
def __call__(self):
def __repr__(self):
''' Printing other info '''
return '\n'
def interp_scalar(self, xdata):
Perform interpolation to the target grid.
This methods can be used for scalar quantities.
xdata : xarray
2D array to be interpolated, it must be on the `src_grid`
out : xarray
Interpolated xarray on the target grid
res = xdata.interp_like(self.tgt,method=self.choice)
return res
class Ocean_Interpolator():
"""This class creates weights for interpolation of ocean fields.
This class create an interpolator operator from a *source grid*
`src_grid` to a *target grid* `tgt_grd`. The interpolator then
can be used to perform the actual interpolation. The model uses
an Arakawa C-grid, that is shown in the following picture. The f-points
correspond to the points where the Coriolis terms are carried.
.. image:: ../resources/NEMOgrid.png
:scale: 25 %
:align: right
The Arakawa C-grid used in the ocean model show also the ordering
of the points, indicating which points correspond to the (i,j) index.
The *source grid* must be a `xarray` `DataSet` containing coordinates
`latitude` and `longitude`.
The *target grid* must be a `xarray` `DataArray` with variables `lat` and `lon`
Border land points at all levels are covered by a convolution value using a
window that can be changed in `sea_over_land`.
Works only on single `DataArray`
src_grid : xarray
Source grid
tgt_grid : xarray
Target grid
level : float
Depth to generate the interpolator
window : int
Window for sea over land
period : int
Minimum number of points in the sea-over-land process
verbose: bool
Lots of output
tgt_grid :
Target grid
mask :
Mask of the target grid
vt :
wt :
mdir :
Directory for masks files
ingrid :
Input Grid, `src_grid_name`
outgrid :
Output Grid `tgt_grid_name`
window :
Window for convolution Sea-Over-Land (default 3)
period :
Minimum number of points into the window (default 1)
T_lon :
Longitudes of input T-mask
T_lat :
Latitudes of input T-mask
U_lon :
Longitudes of input U-mask
U_lat :
Latitudes of input U-mask
V_lon :
Longitudes of input V-mask
V_lat :
Latitudes of input V-mask
tangle :
Angles of the T points of the input grid
mask_reg :
Mask of the Target grid
cent_long :
Central Longitude of the Target grid
name :
Name of the Interpolator Object
level :
Level of the Interpolator Object
Interp_T :
Interpolate Scalar quantities at T points
Interp_UV :
Interpolate Vector Velocities at (U,V)
mask_sea_over_land :
Mask border point for Sea over land
UV_sea_over_land :
Fill U,V values over land
to_file :
Writes interpolator object to file (pickled format)
Create the weights for interpolation
>>> w= zint.Ocean_Interpolator(src_grid,tgt_grid)
Interpolate temperature
>>> target_xarray=w.interp_T(src_xarray,method='linear')
Interpolate U,V
>>> target_xarray=w.interp_UV(U_xarray,V_xarray,method='linear')
__slots__ = ('mask','masku','maskv','mdir', 'mask_reg', \
'sea_index','sea_index_U','sea_index_V', \
'latlon', 'masT_vec', 'maskub','maskvb','masktb',\
'latlon_reg','sea_index_reg','regmask_vec', \
def __init__(self, src_grid_name, tgt_grid_name,level=1,verbose=False,window=3,period=1):
# Put here info on grids to be obtained from __call__
# This currently works with mask files
# 'masks_CMCC-CM2_VHR4_AGCM.nc'
# and
# 'ORCA025L50_mesh_mask.nc'
# Path to auxiliary Ocean files
homedir = os.path.expanduser("~")
self.mdir = homedir + '/Dropbox (CMCC)/data_zapata'
self.ingrid = src_grid_name
self.outgrid = tgt_grid_name
# Parameter for sea over land
self.window = window
self.period = period
# Check levels
if level > 0:
SystemError(f' Surface variable not available, Level {level}')
#Resolve grids
s_in = self._resolve_grid(src_grid_name,level)
tk = s_in['tmask']
self.T_lon = s_in['lonT']
self.T_lat = s_in['latT']
mask = tk.assign_coords({'lat':self.T_lat,'lon':self.T_lon}).drop_vars(['U_lon','U_lat','V_lon','V_lat','T_lon','T_lat']).rename({'z':'deptht'})
tk = s_in['umask']
self.U_lon = s_in['lonU']
self.U_lat = s_in['latU']
masku = tk.assign_coords({'lat':self.U_lat,'lon':self.U_lon}).drop_vars(['U_lon','U_lat','V_lon','V_lat','T_lon','T_lat']).rename({'z':'deptht'})
tk = s_in['vmask']
self.V_lon = s_in['lonV']
self.V_lat = s_in['latV']
maskv = tk.assign_coords({'lat':self.V_lat,'lon':self.V_lon}).drop_vars(['U_lon','U_lat','V_lon','V_lat','T_lon','T_lat']).rename({'z':'deptht'})
self.name = 'UV Velocity'
self.level = str(level)
#Fix Polar Fold
mask[-3:,:] = False
#T angles
self.tangle = s_in['tangle']
#Sea over land
self.mask = xr.where(mask !=0, 1,np.nan)
self.masktb,dum = self.mask_sea_over_land(mask)
self.maskub,self.masku = self.mask_sea_over_land(masku)
self.maskvb,self.maskv = self.mask_sea_over_land(maskv)
print(f' Generating interpolator for {self.name}')
if verbose:
# Get triangulation for all grids
self.latlon,self.sea_index, self.masT_vec = get_sea(self.mask)
self.tri_sea_T = Delaunay(self.latlon) # Compute the triangulation for T
print(f' computing the triangulation for T grid')
latlon_U,self.sea_index_U, masU_vec = get_sea(self.masku)
self.tri_sea_U = Delaunay(latlon_U) # Compute the triangulation for U
print(f' computing the triangulation for U grid')
latlon_V,self.sea_index_V, masT_vec = get_sea(self.maskv)
self.tri_sea_V = Delaunay(latlon_V) # Compute the triangulation for V
print(f' computing the triangulation for V grid')
#Target Grid
s_out = self._resolve_grid(tgt_grid_name,level,verbose=verbose)
self.mask_reg = s_out['tmask']
self.cent_long = s_out['cent_long']
self.latlon_reg,self.sea_index_reg,self.regmask_vec = get_sea(self.mask_reg)
def __call__(self):
print(f' Interpolator for T,U,V GLORS data ')
print(f' This is for level at depth {self.level} m')
print(f' Main methods interp_T and interp_UV')
def __repr__(self):
''' Printing other info '''
print(f' Interpolator for T,U,V GLORS data ')
print(f' This is for level at depth {self.level} m')
return '\n'
def interp_T(self, xdata, method='linear'):
Perform interpolation for T Grid point to the target grid.
This methods can be used for scalar quantities.
xdata : xarray
2D array to be interpolated, it must be on the `src_grid`
method : str
Method for interpolation
* 'linear' , Use linear interpolation
* 'nearest' , use nearest interpolation
out : xarray
Interpolated xarray on the target grid
# Compute interpolation T
Tstack = xdata.stack(ind=('y','x'))
sea_T = Tstack[self.sea_index]
temp = xr.full_like(self.regmask_vec,np.nan)
if method == 'linear':
interpolator = spint.LinearNDInterpolator(self.tri_sea_T, sea_T)
elif method == 'nearest':
interpolator = spint.NearestNDInterpolator(self.tri_sea_T, sea_T)
SystemError(f' Error in interp_T , wrong method {method}')
T_reg = interpolator(self.latlon_reg)
temp[self.sea_index_reg] = T_reg.data
out = temp.unstack()
#Fix dateline problem
if self.outgrid == 'L44_025_REG_GLO':
out[:,1439] = out[:,1438] + delx*(out[:,1]-out[:,1438])/ddelx
out[:,0] = out[:,1438] + 2*delx*(out[:,1]-out[:,1438])/ddelx
return out
def interp_UV(self, udata, vdata, method = 'linear'):
Perform interpolation for U,V Grid point to the target grid.
This methods can be used for vector quantities.
The present method interpolates the U,V points to the T points,
rotates them and then interpolates to the target grid.
udata,vdata : xarray
2D array to be interpolated, it must be on the `src_grid`
out : xarray
Interpolated xarray on the target grid
# Insert NaN
udata = xr.where(udata < 200,udata, np.nan)
vdata = xr.where(vdata < 200,vdata, np.nan)
#udata,vdata = self.UV_sea_over_land(udata, vdata, self.masku,self.maskv)
udata = self.fill_sea_over_land(udata,self.masku)
vdata = self.fill_sea_over_land(vdata,self.maskv)
# Compute interpolation for U,V
Ustack = udata.stack(ind=('y','x'))
Vstack = vdata.stack(ind=('y','x'))
sea_U = Ustack[self.sea_index_U]
sea_V = Vstack[self.sea_index_V]
#Interpolate to T_grid
if method == 'linear':
int_U = spint.LinearNDInterpolator(self.tri_sea_U, sea_U)
int_V = spint.LinearNDInterpolator(self.tri_sea_V, sea_V)
elif method == 'nearest':
int_U = spint.NearestNDInterpolator(self.tri_sea_U, sea_U)
int_V = spint.NearestNDInterpolator(self.tri_sea_V, sea_V)
SystemError(f' Error in interp_UV , wrong method {method}')
U_on_T = int_U(self.latlon)
V_on_T = int_V(self.latlon)
UT = self.masT_vec.copy()
VT = self.masT_vec.copy()
UT[self.sea_index] = U_on_T.data
VT[self.sea_index] = V_on_T.data
UT = UT.unstack()
VT = VT.unstack()
#Rotate Velocities
fac = self.tangle*math.pi/180.
uu = (UT * np.cos(fac) - VT * np.sin(fac)).drop_vars(['nav_lon','nav_lat'])
vv = (VT * np.cos(fac) + UT * np.sin(fac)).drop_vars(['nav_lon','nav_lat'])
# Interpolate to regular grid
Uf = self.interp_T( uu, method=method)
Vf = self.interp_T( vv, method=method)
#Fix dateline problem
if self.outgrid == 'L44_025_REG_GLO':
Uf[:,1439] = Uf[:,1438] + delx*(Uf[:,1]-Uf[:,1438])/ddelx
Uf[:,0] = Uf[:,1438] + 2*delx*(Uf[:,1]-Uf[:,1438])/ddelx
return Uf,Vf
def to_file(self, filename):
This method writes to file the interpolating object in
pickled format
with gzip.open(filename, 'wb') as output: # Overwrites any existing file.
pickle.dump(self, output, pickle.HIGHEST_PROTOCOL)
def mask_sea_over_land(self,mask):
Mask border point for `Sea over land`.
Sea over land is obtained by forward and backward filling NaN land
value with an arbitrary value, then athey are masked to reveal only the
coastal points.
original mask
border mask
masknan=xr.where(mask != 0, 1, np.nan)
um1 = masknan.ffill(dim='x',limit=1).fillna(0.)+ masknan.bfill(dim='x',limit=1).fillna(0.)- 2*masknan.fillna(0)
um2 = masknan.ffill(dim='y',limit=1).fillna(0.)+ masknan.bfill(dim='y',limit=1).fillna(0.)- 2*masknan.fillna(0)
bord = (um1+um2)/2
um = bord + masknan.fillna(0)
um = xr.where(um!=0, 1,np.nan)
mb = xr.where(bord !=0, 1,np.nan)
return mb, um
def _resolve_grid(self,ingrid,level,verbose=False):
Internal routine to resolve grid informations
if ingrid == 'L75_025_TRP_GLO':
print(f' Tripolar L75 0.25 Grid -- {ingrid}')
grid = xr.open_dataset(self.mdir + '/L75_025_TRP_GLO/tmask_UVT_latlon_coordinates.nc').sel(z=level)
geo = xr.open_dataset(self.mdir + '/L75_025_TRP_GLO/NEMO_coordinates.nc')
angle = xr.open_dataset(self.mdir + '/L75_025_TRP_GLO/ORCA025L75_angle.nc')
struct={'tmask': grid.tmask, 'umask': grid.umask,'vmask': grid.vmask, 'tangle': angle.tangle, \
'lonT': geo.glamt,'latT':geo.gphit,'lonU':geo.glamu, \
'latU':geo.gphiu,'lonV':geo.glamv,'latV':geo.gphiv }
elif ingrid == 'L44_025_TRP_GLO':
print(f' Tripolar L44 0.25 Grid -- {ingrid}')
grid = xr.open_dataset(self.mdir + '/L44_025_TRP_GLO/tmask44_UVT_latlon_coordinates.nc').sel(z=level)
angle = xr.open_dataset(self.mdir + '/L75_025_TRP_GLO/ORCA025L75_angle.nc')
geo = xr.open_dataset(self.mdir + '/L75_025_TRP_GLO/NEMO_coordinates.nc')
struct={'tmask': grid.tmask, 'umask': grid.umask,'vmask': grid.vmask, 'tangle': angle.tangle, \
'lonT': geo.glamt,'latT':geo.gphit,'lonU':geo.glamu, \
'latU':geo.gphiu,'lonV':geo.glamv,'latV':geo.gphiv }
elif ingrid == 'L75_025_REG_GLO':
print(f' Regular L75 0.25 Lat-Lon Grid -- {ingrid}')
grid = xr.open_dataset(self.mdir + '/L75_025_REG_GLO/GLO-MFC_001_025_mask_bathy.nc').sel(z=level). \
struct={'tmask': grid.mask, 'tangle': None }
elif ingrid == 'L44_025_REG_GLO':
print(f' Regular L44 0.25 Lat-Lon Grid from WOA -- {ingrid}')
grid = xr.open_dataset(self.mdir + '/WOA/m025x025L44.nc').sel(depth=level)
cent_long = 720
struct={'tmask': grid.m025x025L44, 'tangle': None ,'cent_long' : cent_long}
SystemError(f'Wrong Option in _resolve_grid --> {ingrid}')
if verbose:
print(f'Elements of {ingrid} extracted \n ')
for i in struct.keys():
print(f' {i} \n')
return struct
def fill_sea_over_land(self,U,u_mask):
Put values Sea over land.
Using the mask of the border points, the border points are filled
with the convolution in 2D, using a window of width `window`, here
The min_periods value is controlling the minimum number of points
within the window that is necessary to yield a result.
They can be fixed as attributes of the interpolator
Field to be treated
Mask for the field
Filled array
r = U.rolling(x=self.window, y=self.window, min_periods=self.period,center=True)
border = ~np.isnan(self.maskub).drop_vars({'lat','lon'}).stack(ind=self.maskub.dims)
rs = r1.stack(ind=r1.dims)
UU[border] = rs[border]
UUU = UU.unstack()
UUU = UUU.assign_coords({'lon':u_mask.lon,'lat':u_mask.lat})
return UUU
def get_sea(maskT):
Obtain indexed coordinates for sea points
# Try Interpolation
tm = zlib.putna(-0.1,0.1,maskT)
sea_index = ~np.isnan(tm).stack(ind=maskT.dims)
maskT_vec = tm.stack(ind=maskT.dims)
land_point = maskT_vec[~sea_index]
sea_point = maskT_vec[sea_index]
land_point.name = 'Land'
sea_point.name = 'Sea'
# Compute triangularization for sea points
return latlon, sea_index, maskT_vec
def from_file(file):
Read interpolator object from `file`
with gzip.open(file, 'rb') as input:
w = pickle.load(input)
return w