We forced a linear slab-ocean model ( https://doi.org/10.1016/0011-7471(70)90043-4) with 250GB of CCMP_v2 wind fields.
How do you do this in under 10 minutes? -- With the @pangeo_data stack ( @xarray_dev, @zarr_dev, @dask_dev) and with @numpy_team sped up by @numba_jit. https://twitter.com/RathWilli/status/1289171567885160448
How do you do this in under 10 minutes? -- With the @pangeo_data stack ( @xarray_dev, @zarr_dev, @dask_dev) and with @numpy_team sped up by @numba_jit. https://twitter.com/RathWilli/status/1289171567885160448
First, convert all the data from netCDF to @zarr_dev using @xarray_dev by essentially:
http://xr.open _mfdataset("ccmp_v2_data/*.nc").to_zarr("ccmp_v2.zarr/")
http://xr.open _mfdataset("ccmp_v2_data/*.nc").to_zarr("ccmp_v2.zarr/")
Then, apply Large and Pond's wind stress formula ( https://doi.org/10.1175/1520-0485(1981)011%3C0324:OOMFMI%3E2.0.CO;2) using plain @xarray_dev:
speed = (U**2 + V**2)**0.5
cd = xr.where(
speed < 11, 1.2e-3, 0.49e-3 + 0.065e-3 * speed
)
taux = U * cd * speed
tauy = V * cd * speed
speed = (U**2 + V**2)**0.5
cd = xr.where(
speed < 11, 1.2e-3, 0.49e-3 + 0.065e-3 * speed
)
taux = U * cd * speed
tauy = V * cd * speed
Next step is to prepare the actual model integration. We do this on the complex plane so we need a complex @xarray_dev DataArray with the wind stress:
T = taux + 1j * tauy
T = taux + 1j * tauy
The model itself is a forward integration:
def integrate(T, d_1):
q = np.zeros_like(T)
for l in range(2, T.shape[0]):
q[l, ...] = (
d_2 * q[l-2, ...] +
d_1[l-1, ...] * q[l-1, ...] +
c_2 * T[l-2, ...]
)
return q
def integrate(T, d_1):
q = np.zeros_like(T)
for l in range(2, T.shape[0]):
q[l, ...] = (
d_2 * q[l-2, ...] +
d_1[l-1, ...] * q[l-1, ...] +
c_2 * T[l-2, ...]
)
return q
But the loop is slow. So add a @numba_jit decorator:
@numba.autojit
def integrate(T, d_1):
[...]
return q
@numba.autojit
def integrate(T, d_1):
[...]
return q
Forward integration is done with @xarray_dev and @dask_dev:
q = xr.apply_ufunc(
integrate, T, d_1,
vectorize=True,
input_core_dims=[['time'], ['time']],
output_core_dims=[['time']],
output_dtypes=[np.complex],
dask='parallelized'
)
q = xr.apply_ufunc(
integrate, T, d_1,
vectorize=True,
input_core_dims=[['time'], ['time']],
output_core_dims=[['time']],
output_dtypes=[np.complex],
dask='parallelized'
)
The whole integration used a @dask_dev cluster of 16 full nodes from an HPC cluster deployed with Dask-jobqueue ( https://jobqueue.dask.org/ ):
from dask_jobqueue import PBSCluster
cluster = PBSCluster()
cluster.scale(jobs=16)
from dask_jobqueue import PBSCluster
cluster = PBSCluster()
cluster.scale(jobs=16)