We have a batch processing system which we are looking to modify to use multiple threads. The process takes in a delimited file and performs calculations on it via pandas.
I would like to split up the dataframe into N chunks if the total amount of records exceeds a threshold. Each chunk should then be fed to a thread from a threadpool executor to get the calculations done, then at the end I would wait for the threads to sync and concatenate the resulting DFs into one.
Problem is that I’m not sure how to split a Pandas DF like this. Let’s say there’s going to be an arbitrary number of threads, 2 (as an example), and i want to start the split if the record number is over 200000
So the idea would be, if I send a file with 200001 records, thread 1 would get 100000, and thread 2 would get 100001. If I send one with 1000000, thread 1 would get 500000 and thread 2 would get 500000.
(If the total records don’t exceed this threshold, I’d just execute the process on a single thread)
I have seen related solutions, but none have applied to my case.
Advertisement
Answer
Below, I’ve included example code of how to split. Then, using ThreadPoolExecutor
, it will execute the code with eight threads, in my case (you can use the Thread
library too). The process_pandas
function is just a dummy function; you can use whatever you want:
import pandas as pd from concurrent.futures import ThreadPoolExecutor as th threshold = 300 block_size = 100 num_threads = 8 big_list = pd.read_csv('pandas_list.csv',delimiter=';',header=None) blocks = [] if len(big_list) > threshold: for i in range((len(big_list)//block_size)): blocks.append(big_list[block_size*i:block_size*(i+1)]) i=i+1 if i*block_size < len(big_list): blocks.append(big_list[block_size*i:]) else: blocks.append(big_list) def process_pandas(df): print('Doing calculations...') indexes = list(df.index.values) df.loc[indexes[0], 2] = 'changed' return df with th(num_threads) as ex: results = ex.map(process_pandas,blocks) final_dataframe = pd.concat(results, axis=0)