I’m new in python and I have a concurrent problem when using internal functions of importing libraries. The problem is that my code calculates different kinds of variables and in the last process they are saved into different files. But I have the same problem when reading and writing.
This is an example code that works because is linear:
import xarray as xr def read_concurrent_files(self): files_var_type1 = get_files('type1','20200101','20200127') files_var_type2 = get_files('type2','20200101','20200127') files_var_type3 = get_files('type3','20200101','20200127') def get_files(self, varType, dateini, datefin): # This methods return an array of file paths files = self.get_file_list(varType, dateini, datefin) files_raw = xr.open_mfdataset(files , engine='cfgrib', combine='nested', concat_dim ='time', decode_coords = False, parallel = True) return files_raw
But when I make these changes to the code to be concurrent it fails:
import xarray as xr from multiprocessing.pool import ThreadPool def read_concurrent_files(self): pool = ThreadPool(processes=3) async_result1 = pool.apply_async(self.get_files, ('type1','20200101','20200127',)) async_result2 = pool.apply_async(self.get_files, ('type2','20200101','20200127',)) async_result3 = pool.apply_async(self.get_files, ('type3','20200101','20200127',)) files_var_type1 = async_result1.get() files_var_type2 = async_result2.get() files_var_type3 = async_result3.get() def get_files(self, varType, dateini, datefin): # This methods return an array of file paths files = self.get_file_list(varType, dateini, datefin) files_raw = xr.open_mfdataset(files , engine='cfgrib', combine='nested', concat_dim ='time', decode_coords = False, parallel = True) return files_raw
The problem is in the xr.open_mfdataset call that is not ThreadSafe (or I think so).
Is there a way to encapsulate the import library into the method scope only?
I came from other languages and that was easy creating the instance into the method or using ThreadSafe objects.
Thanks a lot in advance!!
Advertisement
Answer
As I’m new in python I was unaware of the different kinds of threads that we can create, so in my example above, I was using the ThreadPool that can be locked by the GIL (Global Interpreter Lock), so to avoid it there is another kind of threads we can use, here an example:
import os import concurrent.futures def get_xarray(self): tasks = [] cpu_count = os.cpu_count() with concurrent.futures.ProcessPoolExecutor(max_workers = cpu_count) as executor: for i in range(0, len(self.files)): tasks.append(executor.submit(self.get_xarray_by_file, self.files[i])) results = [] for result in tasks: results.append(result.result()) era_raw = xr.merge(results, compat='override') return era_raw.persist().load() def get_xarray_by_file(self, files): era_raw = xr.open_mfdataset(files , engine='cfgrib', combine='nested', concat_dim ='time', decode_coords = False, parallel = True) return era_raw.persist().load()
In that case, we use the ProcessPoolExecutor:
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only pickable objects can be executed and returned.
Now we can read in parallel grib2 files, or create nc or csv files from a dataframe in real parallel.