Skip to content
Advertisement

How to trigger a function once a condition is met

What is an efficient way that I can trigger some function once the length of a list changes by a certain amount?

I have a nested list to which I add data 100 times per second, and I want to trigger a function once the length of the list increased by some value. I tried doing this with an if statement inside a while loop (see my_loop() below). This works, but this seemingly simple operation takes up 100% of one of my CPU cores. It seems to me that constantly querying the size of the list is the limiting factor of the script (adding data to the list in the while loop is not resource-intensive).

Here is what I have tried so far:

from threading import Event, Thread
import time

def add_indefinitely(list_, kill_signal):
    """
    list_ : list
        List to which data is added.
    kill_signal : threading.Event
    """
    while not kill_signal.is_set():
        list_.append([1] * 32)
        time.sleep(0.01)  # Equivalent to 100 Hz.


def my_loop(buffer_len, kill_signal):
    """
    buffer_len: int, float
        Size of the data buffer in seconds. Gets converted to n_samples
        by multiplying by the sampling frequency (i.e., 100).
    kill_signal : threading.Event
    """
    buffer_len *= 100
    b0 = len(list_)
    while not kill_signal.is_set():
        if len(list_) - b0 > buffer_len:
            b0 = len(list_)
            print("Len of list_ is {}".format(b0))


list_ = []
kill_signal = Event()
buffer_len = 2  # Print something every 2 seconds.


data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal))
data_thread.start()

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal))
loop_thread.start()


def stop_all():
    """Stop appending to and querying the list.
    SO users, call this function to clean up!
    """
    kill_signal.set()
    data_thread.join()
    loop_thread.join()

Example output:

Len of list_ is 202
Len of list_ is 403
Len of list_ is 604
Len of list_ is 805
Len of list_ is 1006

Advertisement

Answer

It’s not very safe to access a list from two threads, so I’ll suggest a safer way to communicate between threads. In CPython, your code won’t corrupt the contents of the list, but you might not get exactly 200 items each time you process a batch. If you started removing items from the list in my_loop(), you could run into trouble. If you use other versions of Python without the GIL, you could have more trouble.

Before that, though, here’s the smallest change I can think of to solve the problem you asked about: CPU usage. I just added a sleep to my_loop() and cleaned up the buffer calculations so it now reports a mostly steady 201, 401, 601. Occasionally, I see a 1002.

from threading import Event, Thread
import time

def add_indefinitely(list_, kill_signal):
    """
    list_ : list
        List to which data is added.
    kill_signal : threading.Event
    """
    while not kill_signal.is_set():
        list_.append([1] * 32)
        time.sleep(0.01)  # Equivalent to 100 Hz.


def my_loop(buffer_len, kill_signal):
    """
    buffer_len: int, float
        Size of the data buffer in seconds. Gets converted to n_samples
        by multiplying by the sampling frequency (i.e., 100).
    kill_signal : threading.Event
    """
    buffer_len *= 100
    b0 = len(list_)
    while not kill_signal.is_set():
        time.sleep(0.01)
        if len(list_) - b0 >= buffer_len:
            b0 += buffer_len
            print("Len of list_ is {}".format(len(list_)))


list_ = []
kill_signal = Event()
buffer_len = 2  # Print something every 2 seconds.


data_thread = Thread(target=add_indefinitely, args=(list_, kill_signal))
data_thread.start()

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal))
loop_thread.start()


def stop_all():
    """Stop appending to and querying the list.
    SO users, call this function to clean up!
    """
    kill_signal.set()
    data_thread.join()
    loop_thread.join()

time.sleep(30)
stop_all()

Now, to do this safely, I suggest you use a queue. That will allow many threads to read or write to the queue, and it will handle the communication. If a thread tries to read from an empty queue, it just blocks until some other thread adds an item to the queue.

I wasn’t sure exactly what you wanted to do with the items, so I just put them in a list and left them there. However, now that the list is only being accessed by one thread, it’s safe to clear it out after each batch of 100 items are processed.

Because the my_loop() is now blocking, it won’t necessarily notice when you set the kill signal. Instead, I used a sentry value of None in the request queue to tell it to shut down. If that doesn’t work for you, you can use a timeout when getting an item from the queue, check the kill signal, and then try getting an item again.

from threading import Event, Thread
from queue import Queue
import time

def add_indefinitely(request_queue, kill_signal):
    """
    list_ : list
        List to which data is added.
    kill_signal : threading.Event
    """
    while not kill_signal.is_set():
        request_queue.put([1] * 32)
        time.sleep(0.01)  # Equivalent to 100 Hz.
    request_queue.put(None)  # Signal to shut down


def my_loop(buffer_len, kill_signal):
    """
    buffer_len: int, float
        Size of the data buffer in seconds. Gets converted to n_samples
        by multiplying by the sampling frequency (i.e., 100).
    kill_signal : threading.Event
    """
    received_items = []  # replaces list_
    buffer_len *= 100
    while True:
        item = request_queue.get()
        if item is None:
            break
        received_items.append(item)
        if len(received_items) % buffer_len == 0:
            print("Len of received_items is {}".format(len(received_items)))


request_queue = Queue()
kill_signal = Event()
buffer_len = 2  # Print something every 2 seconds.


data_thread = Thread(target=add_indefinitely, args=(request_queue, kill_signal))
data_thread.start()

loop_thread = Thread(target=my_loop, args=(buffer_len, kill_signal))
loop_thread.start()


def stop_all():
    """Stop appending to and querying the list.
    SO users, call this function to clean up!
    """
    kill_signal.set()
    data_thread.join()
    loop_thread.join()

time.sleep(30)
stop_all()
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement