I am trying to calculate windspeed from u and v components for 1 year data at hourly timestep and 0.1 x 0.1 Degree resolution for a total of 40 years. The individual u and v netcdf files for 1 year is about 5GB each. I have implemented a basic for
loop where the u and v netcdf files for each year are opened through Xarray open_dataset
and rechunked to get them as dask arrays, followed by the calculations and export the result as new netcdf.
When the loop is run, the first iteration happens almost instantaneously but then the loop takes too long for the next iteration (almost to a point where it appears to be stalled). I do not understand what part of my code is bottlenecking here and why. Any help would be appreciated. Also, I have properly implemented the dask scheduler to request the resources adaptively. I am attaching the relevant code snippet for reference :
cluster = PBSCluster(cores=1,memory='8GB',queue='standard',project='civil',interface='ib0',walltime='00:20:00') cluster.adapt(minimum=1, maximum=8) client = Client(cluster) for i in range (1979,2019): u_dir = glob.glob('../u_wind/uwind_hourly_'+ str(i)+'*.nc') v_dir = glob.glob('../v_wind/vwind_hourly_'+ str(i)+'*.nc') w_dir = './wind/wind_hourly_'+str(i)+'-'+str(i)+'.nc' u_wind = xr.open_dataset(u_dir[0]) v_wind = xr.open_dataset(v_dir[0]) u_wind_rechunk = u_wind.chunk({'time':720}) v_wind_rechunk = v_wind.chunk({'time':720}) u_var = u_wind_rechunk['UGRD_10m'] v_var = v_wind_rechunk['VGRD_10m'] wind_speed = xr.Dataset(data_vars=None, coords=None, attrs=None) wind_speed=wind_speed.assign(wind_speed=np.sqrt(u_var**2 + v_var**2)) wind_speed.to_netcdf(w_dir) del u_wind del v_wind del u_wind_rechunk del v_wind_rechunk del u_var del v_var del wind_speed gc.collect()
Advertisement
Answer
As it is, your code still appears to be serial rather than parallel, specifically wind_speed.to_netcdf(w_dir)
will trigger computation right away. The code below might require some adjustment, but the main point is to parallelise your operations:
def single_run(i): # nothing is modified in the code below relative u_dir = glob.glob('../u_wind/uwind_hourly_'+ str(i)+'*.nc') v_dir = glob.glob('../v_wind/vwind_hourly_'+ str(i)+'*.nc') w_dir = './wind/wind_hourly_'+str(i)+'-'+str(i)+'.nc' u_wind = xr.open_dataset(u_dir[0]) v_wind = xr.open_dataset(v_dir[0]) u_wind_rechunk = u_wind.chunk({'time':720}) v_wind_rechunk = v_wind.chunk({'time':720}) u_var = u_wind_rechunk['UGRD_10m'] v_var = v_wind_rechunk['VGRD_10m'] wind_speed = xr.Dataset(data_vars=None, coords=None, attrs=None) wind_speed=wind_speed.assign(wind_speed=np.sqrt(u_var**2 + v_var**2)) wind_speed.to_netcdf(w_dir) del u_wind del v_wind del u_wind_rechunk del v_wind_rechunk del u_var del v_var del wind_speed gc.collect() # new parts import dask run_me = dask.compute([dask.delayed(single_run)(i) for i in range (1979,2019)])