I am trying to implement some parallel jobs using concurrent.futures
. Each worker requires a copy of a TensorFlow model and some data. I implemented it in the following way (MWE)
import tensorflow as tf
from tensorflow import keras
import numpy as np
import concurrent.futures
import time
def simple_model():
model = keras.models.Sequential([
keras.layers.Dense(units = 10, input_shape = [1]),
keras.layers.Dense(units = 1, activation = 'sigmoid')
model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
return model
def clone_model(model):
model_clone = tf.keras.models.clone_model(model)
return model_clone
def work(model, seq):
return model.predict(seq)
def worker(model, num_of_seq = 4):
seqences = np.arange(0,100).reshape(num_of_seq, -1)
with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
t0 = time.perf_counter()
model_list = [clone_model(model) for _ in range(num_of_seq)]
future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}
Seq_out = []
for future in concurrent.futures.as_completed(future_to_samples):
out = future.result()
t1 = time.perf_counter()
return np.reshape(Seq_out, (-1, )), t1-t0
if __name__ == '__main__':
model = simple_model()
out = worker(model, num_of_seq=4)
creates the model. clone_model
clones a TensorFlow model. work
represents an MWE of possible work. worker
assigns the work
in parallel.
This is not working, it just stuck and does not produce any output. However, the above code works if I replace ProcessPoolExecutor
with ThreadPoolExecutor
. But does not provide any speedup(it could be that it’s not running the workers in parallel).
From my understanding, the error lies in the argument model
future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}
I modified the code such that it sends the path of the model than the model itself to the child processes. And it works.
import tensorflow as tf
from tensorflow import keras
import numpy as np
import concurrent.futures
import time
# gpus = tf.config.experimental.list_physical_devices('GPU')
# if len(gpus) > 0:
# print(f'GPUs {gpus}')
# try: tf.config.experimental.set_memory_growth(gpus[0], True)
# except RuntimeError: pass
def simple_model():
model = keras.models.Sequential([
keras.layers.Dense(units = 10, input_shape = [1]),
keras.layers.Dense(units = 1, activation = 'sigmoid')
model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
return model
def clone_model(model):
model_clone = tf.keras.models.clone_model(model)
return model_clone
def work(model_path, seq):
# model = clone_model(model)# model_list[model_id]
# print(model)
# import tensorflow as tf
model = tf.keras.models.load_model(model_path)
return model.predict(seq)
def worker(model, num_of_seq = 4):
seqences = np.arange(0,num_of_seq*10).reshape(num_of_seq, -1)
model_savepath = './simple_model.h5'
path_list = [model_savepath for _ in range(num_of_seq)]
with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
t0 = time.perf_counter()
# model_list = [clone_model(model) for _ in range(num_of_seq)]
index_list = np.arange(1, num_of_seq)# [clone_model(model) for _ in range(num_of_seq)]
# print(model_list)
future_to_samples = {executor.submit(work, path, seq): seq for path, seq in zip(path_list,seqences)}
Seq_out = []
for future in concurrent.futures.as_completed(future_to_samples):
out = future.result()
t1 = time.perf_counter()
return np.reshape(Seq_out, (-1, )), t1-t0
if __name__ == '__main__':
model = simple_model()
num_of_seq = 400
# model_list = [clone_model(model) for _ in range(4)]
out = worker(model, num_of_seq=num_of_seq)