What is an efficient way that I can trigger some function once the length of a list changes by a certain amount?
I have a nested list to which I add data 100 times per second, and I want to trigger a function once the length of the list increased by some value. I tried doing this with an if
statement inside a while
loop (see my_loop()
below). This works, but this seemingly simple operation takes up 100% of one of my CPU cores. It seems to me that constantly querying the size of the list is the limiting factor of the script (adding data to the list in the while
loop is not resource-intensive).
Here is what I have tried so far:
from threading import Event, Thread import time def add_indefinitely(list_, kill_signal): """ list_ : list List to which data is added. kill_signal : threading.Event """ while not kill_signal.is_set(): list_.append([1] * 32) time.sleep(0.01) # Equivalent to 100 Hz. def my_loop(buffer_len, kill_signal): """ buffer_len: int, float Size of the data buffer in seconds. Gets converted to n_samples by multiplying by the sampling frequency (i.e., 100). kill_signal : threading.Event """ buffer_len *= 100 b0 = len(list_) while not kill_signal.is_set(): if len(list_) - b0 > buffer_len: b0 = len(list_) print("Len of list_ is {}".format(b0)) list_ = [] kill_signal = Event() buffer_len = 2 # Print something every 2 seconds. data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal)) data_thread.start() loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) loop_thread.start() def stop_all(): """Stop appending to and querying the list. SO users, call this function to clean up! """ kill_signal.set() data_thread.join() loop_thread.join()
Example output:
Len of list_ is 202 Len of list_ is 403 Len of list_ is 604 Len of list_ is 805 Len of list_ is 1006
Advertisement
Answer
It’s not very safe to access a list from two threads, so I’ll suggest a safer way to communicate between threads. In CPython, your code won’t corrupt the contents of the list, but you might not get exactly 200 items each time you process a batch. If you started removing items from the list in my_loop()
, you could run into trouble. If you use other versions of Python without the GIL, you could have more trouble.
Before that, though, here’s the smallest change I can think of to solve the problem you asked about: CPU usage. I just added a sleep to my_loop()
and cleaned up the buffer calculations so it now reports a mostly steady 201, 401, 601. Occasionally, I see a 1002.
from threading import Event, Thread import time def add_indefinitely(list_, kill_signal): """ list_ : list List to which data is added. kill_signal : threading.Event """ while not kill_signal.is_set(): list_.append([1] * 32) time.sleep(0.01) # Equivalent to 100 Hz. def my_loop(buffer_len, kill_signal): """ buffer_len: int, float Size of the data buffer in seconds. Gets converted to n_samples by multiplying by the sampling frequency (i.e., 100). kill_signal : threading.Event """ buffer_len *= 100 b0 = len(list_) while not kill_signal.is_set(): time.sleep(0.01) if len(list_) - b0 >= buffer_len: b0 += buffer_len print("Len of list_ is {}".format(len(list_))) list_ = [] kill_signal = Event() buffer_len = 2 # Print something every 2 seconds. data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal)) data_thread.start() loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) loop_thread.start() def stop_all(): """Stop appending to and querying the list. SO users, call this function to clean up! """ kill_signal.set() data_thread.join() loop_thread.join() time.sleep(30) stop_all()
Now, to do this safely, I suggest you use a queue. That will allow many threads to read or write to the queue, and it will handle the communication. If a thread tries to read from an empty queue, it just blocks until some other thread adds an item to the queue.
I wasn’t sure exactly what you wanted to do with the items, so I just put them in a list and left them there. However, now that the list is only being accessed by one thread, it’s safe to clear it out after each batch of 100 items are processed.
Because the my_loop()
is now blocking, it won’t necessarily notice when you set the kill signal. Instead, I used a sentry value of None in the request queue to tell it to shut down. If that doesn’t work for you, you can use a timeout when getting an item from the queue, check the kill signal, and then try getting an item again.
from threading import Event, Thread from queue import Queue import time def add_indefinitely(request_queue, kill_signal): """ list_ : list List to which data is added. kill_signal : threading.Event """ while not kill_signal.is_set(): request_queue.put([1] * 32) time.sleep(0.01) # Equivalent to 100 Hz. request_queue.put(None) # Signal to shut down def my_loop(buffer_len, kill_signal): """ buffer_len: int, float Size of the data buffer in seconds. Gets converted to n_samples by multiplying by the sampling frequency (i.e., 100). kill_signal : threading.Event """ received_items = [] # replaces list_ buffer_len *= 100 while True: item = request_queue.get() if item is None: break received_items.append(item) if len(received_items) % buffer_len == 0: print("Len of received_items is {}".format(len(received_items))) request_queue = Queue() kill_signal = Event() buffer_len = 2 # Print something every 2 seconds. data_thread = Thread(target=add_indefinitely, args=(request_queue, kill_signal)) data_thread.start() loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal)) loop_thread.start() def stop_all(): """Stop appending to and querying the list. SO users, call this function to clean up! """ kill_signal.set() data_thread.join() loop_thread.join() time.sleep(30) stop_all()