Skip to content
Advertisement

Getting very slow iterations in a loop run over a Datarray using Xarray and Dask

I am trying to calculate windspeed from u and v components for 1 year data at hourly timestep and 0.1 x 0.1 Degree resolution for a total of 40 years. The individual u and v netcdf files for 1 year is about 5GB each. I have implemented a basic for loop where the u and v netcdf files for each year are opened through Xarray open_dataset and rechunked to get them as dask arrays, followed by the calculations and export the result as new netcdf. When the loop is run, the first iteration happens almost instantaneously but then the loop takes too long for the next iteration (almost to a point where it appears to be stalled). I do not understand what part of my code is bottlenecking here and why. Any help would be appreciated. Also, I have properly implemented the dask scheduler to request the resources adaptively. I am attaching the relevant code snippet for reference :

    cluster = PBSCluster(cores=1,memory='8GB',queue='standard',project='civil',interface='ib0',walltime='00:20:00')
    cluster.adapt(minimum=1, maximum=8)
    client = Client(cluster) 
    for i in range (1979,2019):
        u_dir = glob.glob('../u_wind/uwind_hourly_'+ str(i)+'*.nc')
        v_dir = glob.glob('../v_wind/vwind_hourly_'+ str(i)+'*.nc')
        w_dir = './wind/wind_hourly_'+str(i)+'-'+str(i)+'.nc'
        u_wind = xr.open_dataset(u_dir[0])
        v_wind = xr.open_dataset(v_dir[0])
        u_wind_rechunk = u_wind.chunk({'time':720})
        v_wind_rechunk = v_wind.chunk({'time':720}) 
        u_var = u_wind_rechunk['UGRD_10m']
        v_var = v_wind_rechunk['VGRD_10m'] 
        wind_speed = xr.Dataset(data_vars=None, coords=None, attrs=None)
        wind_speed=wind_speed.assign(wind_speed=np.sqrt(u_var**2 + v_var**2))
        wind_speed.to_netcdf(w_dir)
        del u_wind
        del v_wind
        del u_wind_rechunk
        del v_wind_rechunk
        del u_var
        del v_var
        del wind_speed
        gc.collect()

Advertisement

Answer

As it is, your code still appears to be serial rather than parallel, specifically wind_speed.to_netcdf(w_dir) will trigger computation right away. The code below might require some adjustment, but the main point is to parallelise your operations:

def single_run(i):
# nothing is modified in the code below relative
    u_dir = glob.glob('../u_wind/uwind_hourly_'+ str(i)+'*.nc')
    v_dir = glob.glob('../v_wind/vwind_hourly_'+ str(i)+'*.nc')
    w_dir = './wind/wind_hourly_'+str(i)+'-'+str(i)+'.nc'
    u_wind = xr.open_dataset(u_dir[0])
    v_wind = xr.open_dataset(v_dir[0])
    u_wind_rechunk = u_wind.chunk({'time':720})
    v_wind_rechunk = v_wind.chunk({'time':720}) 
    u_var = u_wind_rechunk['UGRD_10m']
    v_var = v_wind_rechunk['VGRD_10m'] 
    wind_speed = xr.Dataset(data_vars=None, coords=None, attrs=None)
    wind_speed=wind_speed.assign(wind_speed=np.sqrt(u_var**2 + v_var**2))
    wind_speed.to_netcdf(w_dir)
    del u_wind
    del v_wind
    del u_wind_rechunk
    del v_wind_rechunk
    del u_var
    del v_var
    del wind_speed
    gc.collect()

# new parts
import dask

run_me = dask.compute([dask.delayed(single_run)(i) for i in range (1979,2019)])
Advertisement