I’m trying to speed up my processing of a PIL.Image
, where I divide the image into small parts, search for the most similar image inside a database and then replace the original small part of the image with this found image.
This is the described function:
def work_image(img, lenx, leny, neigh, split_dict, img_train_rot): constructed_img = Image.new(mode='L', size=img.size) for x in range(0,img.size[0],lenx): for y in range(0,img.size[1],leny): box = (x,y,x+lenx,y+leny) split_img = img.crop(box) res = neigh.kneighbors(np.asarray(split_img).ravel().reshape((1,-1))) #look up the found image part in img_train_rot and define the position as new_box constructed_img.paste(img_train_rot[i].crop(new_box), (x,y)) return constructed_img
Now I wanted to parallelize this function, since f.e. each row of such image parts could be dealt with entirely on its own.
I came up with this approach using multiprocessing.Pool
:
def work_image_parallel(leny, neigh, split_dict, img_train_rot, img_slice): constructed_img_slice = Image.new(mode='L', size=img_slice.size) for y in range(0, img_slice.size[1], leny): box = (0, y, img_slice.size[0], y+leny) img_part = img_slice.crop(box) res = neigh.kneighbors(np.asarray(img_part).ravel().reshape((1,-1))) #look up the found image part in img_train_rot and define the position as new_box constructed_img_slice.paste(img_train_rot[i].crop(new_box), (0,y)) return constructed_img_slice if __name__ == '__main__': lenx, leny = 16, 16 #define my image database and so on neigh = setup_nearest_neighbour(train_imgs, n_neighbors=1) test_img = test_imgs[0] func = partial(work_image_parallel, leny, neigh, split_dict, img_train_rot) pool = multiprocessing.Pool() try: res = pool.map(func, map(lambda x: x, [test_img.crop((x, 0, x+lenx, test_img.size[1])) for x in range(0, test_img.size[0], lenx)])) finally: pool.close() pool.join() test_result2 = Image.new(mode='L', size = test_img.size) for i in range(len(res)): test_result2.paste(res[i], box=(i*lenx, 0, i*lenx + lenx, test_result2.size[1]))
However, this parallelized version isn’t exactly faster than the normal version, and if I decrease the size of my image division, the parallelized version throws an AssertionError (other posts said this might be because the data size to be sent between the processes becomes too big).
Therefore my question, did I maybe do something wrong? Is multiprocessing maybe not the right approach here? Or why doesn’t the multiprocessing decrease the computation time, since the workload per image slice should be big enough to offset the time needed to create processes etc. Any help would be appreciated.
Advertisement
Answer
Disclaimer: I am not that familiar with PIL so you may should take a close look at the PIL method calls, which may need some “adjustment” on your part since there is no way that I can actually test this.
First, I observe that you will probably be making a lot of repeated invocations of your worker function work_image_parallel
and that some of those arguments being passed to that function might be quite large (all of this depends, of course, on how large your images are). Rather than repeatedly passing such potentially large arguments, I would prefer to copy these arguments once to each process in your pool and instantiate them as global variables. This is accomplished with a pool initializer function.
Second, I have attempted to modify your work_image_parallel
function to be as close to your original work_image
function except that it now deals with just a single x, y coordinate pair that is passed to it. In that way more of the work is being done by your subprocesses. I have also tried to reduce the number of pasting operations required (if I have correctly understood what is going on).
Third, because the images may be quite large, I am using a generator expression to create the arguments to be used with imap_unordered
instead of map
. This is because the number of x, y pairs can be quite large in a very large image and map
requires that its iterable argument be such that its length can be computed so that an efficient chunksize value can be computed (see the docs). With imap_unordered
, we should specify an explicit chunksize value to be efficient (the default is 1 if unspecified) if we expect that the iterable could be large. If you know that you are dealing with relatively small images so that the size of the x_y_args
iterable would not be unreasonably memory-inefficient if stored as a list, then, you could just use method map
with the default chunksize
value of None
and have the pool compute the value for you. The advantage of using imap_unordered
is that results do not have to be returned in order, so processing could be faster.
def init_pool(the_img, the_img_train_rot, the_neigh, the_split_dict): global img, img_train_rot, neigh, split_dict img = the_img img_train_rot = the_img_train_rot neigh = the_neigh split_dict = the_split_dict def work_image_parallel(lenx, leny, t): x, y = t box = (x,y,x+lenx,y+leny) split_img = img.crop(box) res = neigh.kneighbors(np.asarray(split_img).ravel().reshape((1,-1))) #look up the found image part in img_train_rot and define the position as new_box # return original x, y values used: return x, y, img_train_rot[i].crop(new_box) def compute_chunksize(iterable_size, pool_size): chunksize, remainder = divmod(iterable_size, 4 * pool_size) if remainder: chunksize += 1 return chunksize if __name__ == '__main__': lenx, leny = 16, 16 #define my image database and so on neigh = setup_nearest_neighbour(train_imgs, n_neighbors=1) test_img = test_imgs[0] func = partial(work_image_parallel, lenx, leny) # in case this is a very large image, use a generator expression x_y_args = ((x, y) for x in range(0, test_img.size[0], lenx) for y in range(0, test_img.size[1], leny)) # approximate size of x_y_args: iterable_size = (test_img.size[0] // lenx) * (test_img.size[1] // leny) pool_size = multiprocessing.cpu_count() chunksize = compute_chunksize(iterable_size, pool_size) pool = multiprocessing.Pool(pool_size, initiializer=init_pool, initargs=(test_img, img_train_rot, neigh, split_dict)) test_result2 = Image.new(mode='L', size = test_img.size) try: # use imap or imap_unordered when the iterable is a generator to avoid conversion of iterable to a list # but specify a suitable chunksize for efficiency in case the iterable is very large: for x, y, res in pool.imap_unordered(func, x_y_args, chunksize=chunksize): test_result2.paste(res, (x, y)) finally: pool.close() pool.join()
Update (break up image into bigger slices)
def init_pool(the_img, the_img_train_rot, the_neigh, the_split_dict): global img, img_train_rot, neigh, split_dict img = the_img img_train_rot = the_img_train_rot neigh = the_neigh split_dict = the_split_dict def work_image_parallel(lenx, leny, x): img_slice = img.crop((x, 0, x+lenx, img.size[1])) constructed_img_slice = Image.new(mode='L', size=img_slice.size) for y in range(0, img_slice.size[1], leny): box = (0, y, img_slice.size[0], y+leny) img_part = img_slice.crop(box) res = neigh.kneighbors(np.asarray(img_part).ravel().reshape((1,-1))) #look up the found image part in img_train_rot and define the position as new_box constructed_img_slice.paste(img_train_rot[i].crop(new_box), (0,y)) return constructed_img_slice if __name__ == '__main__': lenx, leny = 16, 16 #define my image database and so on neigh = setup_nearest_neighbour(train_imgs, n_neighbors=1) test_img = test_imgs[0] pool = multiprocessing.Pool(pool_size, initiializer=init_pool, initargs=(test_img, img_train_rot, neigh, split_dict)) func = partial(work_image_parallel, lenx, leny) try: test_result2 = Image.new(mode='L', size = test_img.size) x = 0 for res in pool.map(func, [x for x in range(0, test_img.size[0], lenx)]): test_result2.paste(res, box=(x, 0, x + lenx, test_result2.size[1])) x += lenx finally: pool.close() pool.join()