Skip to content
Advertisement

How to encapsulate an imported module into a method for multithreading in python?

I’m new in python and I have a concurrent problem when using internal functions of importing libraries. The problem is that my code calculates different kinds of variables and in the last process they are saved into different files. But I have the same problem when reading and writing.

This is an example code that works because is linear:

import xarray as xr

def read_concurrent_files(self):

    files_var_type1 = get_files('type1','20200101','20200127')
    files_var_type2 = get_files('type2','20200101','20200127')
    files_var_type3 = get_files('type3','20200101','20200127')

def get_files(self, varType, dateini, datefin):

    # This methods return an array of file paths
    files = self.get_file_list(varType, dateini, datefin)
    files_raw = xr.open_mfdataset(files , engine='cfgrib', 
        combine='nested', concat_dim ='time', decode_coords = False, parallel = True)      
    return files_raw

But when I make these changes to the code to be concurrent it fails:

import xarray as xr
from multiprocessing.pool import ThreadPool

def read_concurrent_files(self):

    pool = ThreadPool(processes=3)

    async_result1 = pool.apply_async(self.get_files, ('type1','20200101','20200127',))
    async_result2 = pool.apply_async(self.get_files, ('type2','20200101','20200127',))
    async_result3 = pool.apply_async(self.get_files, ('type3','20200101','20200127',))

    files_var_type1 = async_result1.get()
    files_var_type2 = async_result2.get()
    files_var_type3 = async_result3.get()

def get_files(self, varType, dateini, datefin):

    # This methods return an array of file paths
    files = self.get_file_list(varType, dateini, datefin)
    files_raw = xr.open_mfdataset(files , engine='cfgrib', 
        combine='nested', concat_dim ='time', decode_coords = False, parallel = True)      
    return files_raw

The problem is in the xr.open_mfdataset call that is not ThreadSafe (or I think so).

Is there a way to encapsulate the import library into the method scope only?

I came from other languages and that was easy creating the instance into the method or using ThreadSafe objects.

Thanks a lot in advance!!

Advertisement

Answer

As I’m new in python I was unaware of the different kinds of threads that we can create, so in my example above, I was using the ThreadPool that can be locked by the GIL (Global Interpreter Lock), so to avoid it there is another kind of threads we can use, here an example:

import os
import concurrent.futures

def get_xarray(self):
    tasks = []
    cpu_count = os.cpu_count()
    with concurrent.futures.ProcessPoolExecutor(max_workers = cpu_count) as executor:
        for i in range(0, len(self.files)):
            tasks.append(executor.submit(self.get_xarray_by_file, self.files[i]))

    results = []
    for result in tasks:
        results.append(result.result())
    era_raw = xr.merge(results, compat='override')

    return era_raw.persist().load()

def get_xarray_by_file(self, files):
    era_raw = xr.open_mfdataset(files , engine='cfgrib', 
        combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
    return era_raw.persist().load()

In that case, we use the ProcessPoolExecutor:

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only pickable objects can be executed and returned.

Now we can read in parallel grib2 files, or create nc or csv files from a dataframe in real parallel.

User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement