Search code examples
pythonparallel-processingdaskpython-xarray

Xarray/Dask: map_block computation - variable size inconsistency


For a workable example of Python code:

import numpy as np
import xarray as xr
from scipy.optimize import curve_fit
import pandas as pd
import xesmf as xe

from dask.distributed import Client

def diurnal_cycle_model(hours, intercept, coef_cos, coef_sin):
    return intercept + coef_cos * np.cos(2 * np.pi * hours / 24) + coef_sin * np.sin(2 * np.pi * hours / 24)

def harm_fit(data):
    data = data.fillna(0)
    ndim = len(data.dims)
    hours = [3,9,15,21]
    
    params = xr.apply_ufunc(curve_fit, diurnal_cycle_model, hours, data,
                            input_core_dims=[[], ['step'], ['step']],
                            output_core_dims=[['param'], ['param1', 'param2']],
                            vectorize=True, dask='parallelized', output_dtypes=[float, float])
    
    intercept, coef_cos, coef_sin = params[0].isel(param=0), params[0].isel(param=1), params[0].isel(param=2)

    hours_reconstructed = xr.DataArray(data=np.arange(0,24),dims=['step'],coords={'step':pd.timedelta_range(start='0 hour',end='23 hours',freq='1H')})
    dc_reconstructed = diurnal_cycle_model(hours_reconstructed.expand_dims(dim={'time':intercept.time,'number':intercept.number,'lat':intercept.lat,'lon':intercept.lon},axis=(1,2,3,4)),
                                           intercept,
                                           coef_cos,
                                           coef_sin)
    return dc_reconstructed

if __name__ == '__main__':
    s2s_fcst = xr.DataArray(data=np.random.normal(size=(21,21,4,21,47)),
                            dims=['time','number','step','lat','lon'],
                            coords=dict(time=pd.date_range(start='2017-10-02',periods=21),
                                        step=pd.timedelta_range(start='3h',end='21h',freq='6h'),
                                        number=np.arange(0,21),
                                        lat=np.arange(15,-16.5,-1.5),
                                        lon=np.arange(90,160.5,1.5)),
                            name='tp')
    s2s_fit = xr.DataArray(dims=['time','number','step','lat','lon'],
                           coords=dict(time=s2s_fcst.time,number=s2s_fcst.number,
                                       step=pd.timedelta_range(start='0 day',end='46 day',freq='1 h'),
                                       lat=s2s_fcst.lat,lon=s2s_fcst.lon),
                           name='tp_fit')
    
    client = Client(n_workers=4)
    
    for l in range(0,2): #range(0,46):
        print('Lead '+str(l))
        rain_6hr = s2s_fcst[:,:,4*l:4*l+4,:,:]
        
        reshaped_ds = rain_6hr.transpose('step','time','number','lat','lon')
        reshaped_ds_chunk = reshaped_ds.chunk({'time': 3,'number':3})
        print(reshaped_ds_chunk)
        template = (xr.DataArray(data=np.zeros(shape=(24,21,21,21,47)),
                                dims=['step','time','number','lat','lon'],
                                coords=dict(time=s2s_fcst.time,number=s2s_fcst.number,
                                            step=pd.timedelta_range(start='0 hour',end='23 hours',freq='1H'),
                                            lat=s2s_fcst.lat,lon=s2s_fcst.lon))
                      .chunk({'time': 3,'number':3}))
        print(template)
        dc_harm_fit = reshaped_ds_chunk.map_blocks(harm_fit, template=template).compute(scheduler=client)
        print(dc_harm_fit)
        dc_harm_fit_transpose = dc_harm_fit.transpose('time','number','step','lat','lon')
        print(dc_harm_fit_transpose)

For this code, I am trying to fit the forecast rainfall data to the diurnal cycle model defined as diurnal_cycle_model(hours, intercept, coef_cos, coef_sin).

  1. Data structure: s2s_fcst is forecast rainfall data with a 6-hourly time step (step dimension). It has forecast initial time time, forecast time steps step, ensemble members number, and latitude/longitude. Here, data is substituted with random numbers for demonstration.
  2. I am trying to fit the diurnal cycle on the step dimension for each forecast day, i.e. the first forecast days (loop over l) include 0-3rd timesteps of step, and the second forecast days include the 4-7th timesteps of step.
  3. harm_fit requires a long computation time. Therefore, I chunk the data and input it to map_blocks(harm_fit).
  4. In harm_fit, we regress the 4 forecast timesteps on the diurnal_cycle_model and obtain the regression coefficients. Then we reconstruct the hourly rainfall (24 time steps, with 1-hour frequency). Therefore, the output data after harm_fit has the step with 24 steps instead of the input of 4 steps. The template for the map_block has a step dimension of 24 hours.
  5. After compute(), the error message occurs as follows:

Exception has occurred: ValueError
replacement data must match the Variable's shape. replacement data has shape (3, 21, 147, 47, 24); Variable has shape (24, 21, 21, 21, 47)
File "/Users/waynetsai/Desktop/test.py", line 62, in dc_harm_fit = reshaped_ds_chunk.map_blocks(harm_fit, template=template).compute(scheduler=client)

How could I fix the issue? What's wrong with the code?

Thanks for your assistance!


Solution

  • The shape mismatch between your "harm_fit" function's output and the template you're giving "map_blocks" is the root of the problem. While the variable (template) has shape (24, 21, 21, 21, 47), the error indicates that the replacement data (the output of harm_fit) has shape (3, 21, 147, 47, 24).

    This shape mismatch is most likely caused by the fact that the input chunk size (3 steps) and the output of "harm_fit" (24 steps) have different step dimension sizes. It appears that the output has a different step dimension size when using "map_blocks", even though the function is supposed to produce chunks that match the size of the input chunks.

    Making sure that the "input chunk" size and the "harm_fit" output have the same step dimension size is necessary to correct this. The "harm_fit" function can be changed to only return the pertinent steps to accomplish this, or the logic in "harm_fit" and the template can be modified to account for the various step sizes.

    I hope that this helps.

    EDIT:

    Here's a modified version of your code using "map" instead of "map_blocks":

    def harm_fit(data):
        data = data.fillna(0)
        ndim = len(data.dims)
        hours = [3, 9, 15, 21]
    
        params = xr.apply_ufunc(curve_fit, diurnal_cycle_model, hours, data,
                                input_core_dims=[['step'], ['step'], ['step']],
                                output_core_dims=[['param'], ['param1', 'param2']],
                                vectorize=True, dask='parallelized', output_dtypes=[float, float])
    
        intercept, coef_cos, coef_sin = params[0].isel(param=0), params[0].isel(param=1), params[0].isel(param=2)
    
        # create a new template for the output
        template = (xr.DataArray(data=np.zeros(shape=(24, 21, 21, 21, 47)),
                                 dims=['step', 'time', 'number', 'lat', 'lon'],
                                 coords=dict(time=s2s_fcst.time, number=s2s_fcst.number,
                                             step=pd.timedelta_range(start='0 hour', end='23 hours', freq='1H'),
                                             lat=s2s_fcst.lat, lon=s2s_fcst.lon))
                    .chunk({'time': 3, 'number': 3}))
    
        # apply the function using map
        dc_harm_fit = xr.map_blocks(harm_fit, data, template=template)
    
        return dc_harm_fit