Skip to content
Advertisement

How to automatically split a pandas dataframe into multiple chunks?

We have a batch processing system which we are looking to modify to use multiple threads. The process takes in a delimited file and performs calculations on it via pandas.

I would like to split up the dataframe into N chunks if the total amount of records exceeds a threshold. Each chunk should then be fed to a thread from a threadpool executor to get the calculations done, then at the end I would wait for the threads to sync and concatenate the resulting DFs into one.

Problem is that I’m not sure how to split a Pandas DF like this. Let’s say there’s going to be an arbitrary number of threads, 2 (as an example), and i want to start the split if the record number is over 200000

So the idea would be, if I send a file with 200001 records, thread 1 would get 100000, and thread 2 would get 100001. If I send one with 1000000, thread 1 would get 500000 and thread 2 would get 500000.

(If the total records don’t exceed this threshold, I’d just execute the process on a single thread)

I have seen related solutions, but none have applied to my case.

Advertisement

Answer

Below, I’ve included example code of how to split. Then, using ThreadPoolExecutor, it will execute the code with eight threads, in my case (you can use the Thread library too). The process_pandas function is just a dummy function; you can use whatever you want:

import pandas as pd
from concurrent.futures import ThreadPoolExecutor as th

threshold = 300
block_size = 100
num_threads = 8

big_list = pd.read_csv('pandas_list.csv',delimiter=';',header=None)

blocks = []

if len(big_list) > threshold:
    for i in range((len(big_list)//block_size)):
        blocks.append(big_list[block_size*i:block_size*(i+1)])
    i=i+1
    if i*block_size < len(big_list):
        blocks.append(big_list[block_size*i:])
else:
    blocks.append(big_list)

def process_pandas(df):
    print('Doing calculations...')
    indexes = list(df.index.values)
    df.loc[indexes[0], 2] = 'changed'
    return df

with th(num_threads) as ex:
    results = ex.map(process_pandas,blocks)

final_dataframe = pd.concat(results, axis=0)
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement