Skip to content
Advertisement

Using multiprocessing on Image processing

I’m trying to speed up my processing of a PIL.Image, where I divide the image into small parts, search for the most similar image inside a database and then replace the original small part of the image with this found image.
This is the described function:

def work_image(img, lenx, leny, neigh, split_dict, img_train_rot):
    constructed_img = Image.new(mode='L', size=img.size)
    for x in range(0,img.size[0],lenx):
        for y in range(0,img.size[1],leny):  
            box = (x,y,x+lenx,y+leny)
            split_img = img.crop(box)
            res = neigh.kneighbors(np.asarray(split_img).ravel().reshape((1,-1)))
            #look up the found image part in img_train_rot and define the position as new_box
            constructed_img.paste(img_train_rot[i].crop(new_box), (x,y))
    return constructed_img

Now I wanted to parallelize this function, since f.e. each row of such image parts could be dealt with entirely on its own.
I came up with this approach using multiprocessing.Pool:

def work_image_parallel(leny, neigh, split_dict, img_train_rot, img_slice):
    constructed_img_slice = Image.new(mode='L', size=img_slice.size)
    for y in range(0, img_slice.size[1], leny):
        box = (0, y, img_slice.size[0], y+leny)
        img_part = img_slice.crop(box)
        res = neigh.kneighbors(np.asarray(img_part).ravel().reshape((1,-1)))
        #look up the found image part in img_train_rot and define the position as new_box
        constructed_img_slice.paste(img_train_rot[i].crop(new_box), (0,y))
    return constructed_img_slice

if __name__ == '__main__':
    lenx, leny = 16, 16
    #define my image database and so on
    neigh = setup_nearest_neighbour(train_imgs, n_neighbors=1)
    test_img = test_imgs[0]
    func = partial(work_image_parallel, leny, neigh, split_dict, img_train_rot)
    pool = multiprocessing.Pool()
    try:
        res = pool.map(func, map(lambda x: x, [test_img.crop((x, 0, x+lenx, test_img.size[1])) for x in range(0, test_img.size[0], lenx)])) 
    finally:
        pool.close()
        pool.join()
    test_result2 = Image.new(mode='L', size = test_img.size)
    for i in range(len(res)):
        test_result2.paste(res[i], box=(i*lenx, 0, i*lenx + lenx, test_result2.size[1]))

However, this parallelized version isn’t exactly faster than the normal version, and if I decrease the size of my image division, the parallelized version throws an AssertionError (other posts said this might be because the data size to be sent between the processes becomes too big).

Therefore my question, did I maybe do something wrong? Is multiprocessing maybe not the right approach here? Or why doesn’t the multiprocessing decrease the computation time, since the workload per image slice should be big enough to offset the time needed to create processes etc. Any help would be appreciated.

Advertisement

Answer

Disclaimer: I am not that familiar with PIL so you may should take a close look at the PIL method calls, which may need some “adjustment” on your part since there is no way that I can actually test this.

First, I observe that you will probably be making a lot of repeated invocations of your worker function work_image_parallel and that some of those arguments being passed to that function might be quite large (all of this depends, of course, on how large your images are). Rather than repeatedly passing such potentially large arguments, I would prefer to copy these arguments once to each process in your pool and instantiate them as global variables. This is accomplished with a pool initializer function.

Second, I have attempted to modify your work_image_parallel function to be as close to your original work_image function except that it now deals with just a single x, y coordinate pair that is passed to it. In that way more of the work is being done by your subprocesses. I have also tried to reduce the number of pasting operations required (if I have correctly understood what is going on).

Third, because the images may be quite large, I am using a generator expression to create the arguments to be used with imap_unordered instead of map. This is because the number of x, y pairs can be quite large in a very large image and map requires that its iterable argument be such that its length can be computed so that an efficient chunksize value can be computed (see the docs). With imap_unordered, we should specify an explicit chunksize value to be efficient (the default is 1 if unspecified) if we expect that the iterable could be large. If you know that you are dealing with relatively small images so that the size of the x_y_args iterable would not be unreasonably memory-inefficient if stored as a list, then, you could just use method map with the default chunksize value of None and have the pool compute the value for you. The advantage of using imap_unordered is that results do not have to be returned in order, so processing could be faster.

def init_pool(the_img, the_img_train_rot, the_neigh, the_split_dict):
    global img, img_train_rot, neigh, split_dict
    img = the_img
    img_train_rot = the_img_train_rot
    neigh = the_neigh
    split_dict = the_split_dict

def work_image_parallel(lenx, leny, t):
    x, y = t
    box = (x,y,x+lenx,y+leny)
    split_img = img.crop(box)
    res = neigh.kneighbors(np.asarray(split_img).ravel().reshape((1,-1)))
    #look up the found image part in img_train_rot and define the position as new_box
    # return original x, y values used:
    return x, y, img_train_rot[i].crop(new_box)

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize += 1
    return chunksize

if __name__ == '__main__':
    lenx, leny = 16, 16
    #define my image database and so on
    neigh = setup_nearest_neighbour(train_imgs, n_neighbors=1)
    test_img = test_imgs[0]
    func = partial(work_image_parallel, lenx, leny)
    # in case this is a very large image, use a generator expression
    x_y_args = ((x, y) for x in range(0, test_img.size[0], lenx) for y in range(0, test_img.size[1], leny))
    # approximate size of x_y_args:
    iterable_size = (test_img.size[0] // lenx) * (test_img.size[1] // leny)
    pool_size = multiprocessing.cpu_count()
    chunksize = compute_chunksize(iterable_size, pool_size)
    pool = multiprocessing.Pool(pool_size, initiializer=init_pool, initargs=(test_img, img_train_rot, neigh, split_dict))
    test_result2 = Image.new(mode='L', size = test_img.size)
    try:
        # use imap or imap_unordered when the iterable is a generator to avoid conversion of iterable to a list
        # but specify a suitable chunksize for efficiency in case the iterable is very large:
        for x, y, res in pool.imap_unordered(func, x_y_args, chunksize=chunksize):
            test_result2.paste(res, (x, y))
    finally:
        pool.close()
        pool.join()

Update (break up image into bigger slices)

def init_pool(the_img, the_img_train_rot, the_neigh, the_split_dict):
    global img, img_train_rot, neigh, split_dict
    img = the_img
    img_train_rot = the_img_train_rot
    neigh = the_neigh
    split_dict = the_split_dict

def work_image_parallel(lenx, leny, x):
    img_slice = img.crop((x, 0, x+lenx, img.size[1]))
    constructed_img_slice = Image.new(mode='L', size=img_slice.size)
    for y in range(0, img_slice.size[1], leny):
        box = (0, y, img_slice.size[0], y+leny)
        img_part = img_slice.crop(box)
        res = neigh.kneighbors(np.asarray(img_part).ravel().reshape((1,-1)))
        #look up the found image part in img_train_rot and define the position as new_box
        constructed_img_slice.paste(img_train_rot[i].crop(new_box), (0,y))
    return constructed_img_slice

if __name__ == '__main__':
    lenx, leny = 16, 16
    #define my image database and so on
    neigh = setup_nearest_neighbour(train_imgs, n_neighbors=1)
    test_img = test_imgs[0]
    pool = multiprocessing.Pool(pool_size, initiializer=init_pool, initargs=(test_img, img_train_rot, neigh, split_dict))
    func = partial(work_image_parallel, lenx, leny)
    try:
        test_result2 = Image.new(mode='L', size = test_img.size)
        x = 0
        for res in pool.map(func, [x for x in range(0, test_img.size[0], lenx)]):
            test_result2.paste(res, box=(x, 0, x + lenx, test_result2.size[1]))
            x += lenx
    finally:
        pool.close()
        pool.join()
User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement