I am trying to implement some parallel jobs using concurrent.futures
. Each worker requires a copy of a TensorFlow model and some data. I implemented it in the following way (MWE)
import tensorflow as tf from tensorflow import keras import numpy as np import concurrent.futures import time def simple_model(): model = keras.models.Sequential([ keras.layers.Dense(units = 10, input_shape = [1]), keras.layers.Dense(units = 1, activation = 'sigmoid') ]) model.compile(optimizer = 'sgd', loss = 'mean_squared_error') return model def clone_model(model): model_clone = tf.keras.models.clone_model(model) model_clone.set_weights(model.get_weights()) return model_clone def work(model, seq): return model.predict(seq) def worker(model, num_of_seq = 4): seqences = np.arange(0,100).reshape(num_of_seq, -1) with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor: t0 = time.perf_counter() model_list = [clone_model(model) for _ in range(num_of_seq)] future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)} Seq_out = [] for future in concurrent.futures.as_completed(future_to_samples): out = future.result() Seq_out.append(out) t1 = time.perf_counter() print(t1-t0) return np.reshape(Seq_out, (-1, )), t1-t0 if __name__ == '__main__': model = simple_model() out = worker(model, num_of_seq=4) print(out)
simple_model()
creates the model. clone_model
clones a TensorFlow model. work
represents an MWE of possible work. worker
assigns the work
in parallel.
This is not working, it just stuck and does not produce any output. However, the above code works if I replace ProcessPoolExecutor
with ThreadPoolExecutor
. But does not provide any speedup(it could be that it’s not running the workers in parallel).
From my understanding, the error lies in the argument model
of
future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}
.
Advertisement
Answer
I modified the code such that it sends the path of the model than the model itself to the child processes. And it works.
import tensorflow as tf from tensorflow import keras import numpy as np import concurrent.futures import time # gpus = tf.config.experimental.list_physical_devices('GPU') # if len(gpus) > 0: # print(f'GPUs {gpus}') # try: tf.config.experimental.set_memory_growth(gpus[0], True) # except RuntimeError: pass def simple_model(): model = keras.models.Sequential([ keras.layers.Dense(units = 10, input_shape = [1]), keras.layers.Dense(units = 1, activation = 'sigmoid') ]) model.compile(optimizer = 'sgd', loss = 'mean_squared_error') return model def clone_model(model): model_clone = tf.keras.models.clone_model(model) model_clone.set_weights(model.get_weights()) return model_clone def work(model_path, seq): # model = clone_model(model)# model_list[model_id] # print(model) # import tensorflow as tf model = tf.keras.models.load_model(model_path) return model.predict(seq) def worker(model, num_of_seq = 4): seqences = np.arange(0,num_of_seq*10).reshape(num_of_seq, -1) model_savepath = './simple_model.h5' model.save(model_savepath) path_list = [model_savepath for _ in range(num_of_seq)] with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor: t0 = time.perf_counter() # model_list = [clone_model(model) for _ in range(num_of_seq)] index_list = np.arange(1, num_of_seq)# [clone_model(model) for _ in range(num_of_seq)] # print(model_list) future_to_samples = {executor.submit(work, path, seq): seq for path, seq in zip(path_list,seqences)} Seq_out = [] for future in concurrent.futures.as_completed(future_to_samples): out = future.result() Seq_out.append(out) t1 = time.perf_counter() print(t1-t0) return np.reshape(Seq_out, (-1, )), t1-t0 if __name__ == '__main__': model = simple_model() num_of_seq = 400 # model_list = [clone_model(model) for _ in range(4)] out = worker(model, num_of_seq=num_of_seq) print(out)