I’m new to concurrent programming.
I’d like to execute three tasks repeatedly. The first two should run all the time, the third should run every hour or so. The first two tasks can run in parallel, but I always want to pause them while the third task is running.
Here’s the skeleton of what I’ve tried:
import threading import time flock = threading.Lock() glock = threading.Lock() def f(): while True: with flock: print 'f' time.sleep(1) def g(): while True: with glock: print 'g' time.sleep(1) def h(): while True: with flock: with glock: print 'h' time.sleep(5) threading.Thread(target=f).start() threading.Thread(target=g).start() threading.Thread(target=h).start()
I would expect this code to print an f and a g every second, and an h about every five seconds. However, when I run it it takes around 12 f’s and 12 g’s before I start seeing some h’s. It’s looks like the first two threads constantly release and re-acquire their locks while the third thread is left out of the loop.
- Why is that? When the third thread tries to acquire a currently held lock, and it is then released, shouldn’t acquisition immediately succeed instead of the first/second thread immediately acquiring it again? I am probably misunderstanding something.
- What would be a good way to achieve what I want?
Note: moving the time.sleep(1)
calls out of the with flock/glock block works for this simple example, but apparently not for my real application where the threads spend most of their time doing the actual operations. When the first two threads sleep for a second after each execution of the loop body, with the lock released, the third task still never gets executed.
Advertisement
Answer
How about do it with threading.Events:
import threading import time import logging logger=logging.getLogger(__name__) def f(resume,is_waiting,name): while True: if not resume.is_set(): is_waiting.set() logger.debug('{n} pausing...'.format(n=name)) resume.wait() is_waiting.clear() logger.info(name) time.sleep(1) def h(resume,waiters): while True: logger.debug('halt') resume.clear() for i,w in enumerate(waiters): logger.debug('{i}: wait for worker to pause'.format(i=i)) w.wait() logger.info('h begin') time.sleep(2) logger.info('h end') logger.debug('resume') resume.set() time.sleep(5) logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s %(threadName)s] %(message)s', datefmt='%H:%M:%S') # set means resume; clear means halt resume = threading.Event() resume.set() waiters=[] for name in 'fg': is_waiting=threading.Event() waiters.append(is_waiting) threading.Thread(target=f,args=(resume,is_waiting,name)).start() threading.Thread(target=h,args=(resume,waiters)).start()
yields
[07:28:55 Thread-1] f [07:28:55 Thread-2] g [07:28:55 Thread-3] halt [07:28:55 Thread-3] 0: wait for worker to pause [07:28:56 Thread-1] f pausing... [07:28:56 Thread-2] g pausing... [07:28:56 Thread-3] 1: wait for worker to pause [07:28:56 Thread-3] h begin [07:28:58 Thread-3] h end [07:28:58 Thread-3] resume [07:28:58 Thread-1] f [07:28:58 Thread-2] g [07:28:59 Thread-1] f [07:28:59 Thread-2] g [07:29:00 Thread-1] f [07:29:00 Thread-2] g [07:29:01 Thread-1] f [07:29:01 Thread-2] g [07:29:02 Thread-1] f [07:29:02 Thread-2] g [07:29:03 Thread-3] halt
(In response to a question in the comments) This code tries to measure how long it takes for the h
-thread to acquire each lock from the other worker threads.
It seems to show that even if h
is waiting to acquire a lock, the other worker thread may with fairly high probability release and reacquire the lock.
There is no priority given to h
just because it has been waiting longer.
David Beazley has presented at PyCon about problems related to threading and the GIL. Here is a pdf of the slides. It is a fascinating read and may help explain this as well.
import threading import time import logging logger=logging.getLogger(__name__) def f(lock,n): while True: with lock: logger.info(n) time.sleep(1) def h(locks): while True: t=time.time() for n,lock in enumerate(locks): lock.acquire() t2=time.time() logger.info('h acquired {n}: {d}'.format(n=n,d=t2-t)) t=t2 t2=time.time() logger.info('h {d}'.format(d=t2-t)) t=t2 for lock in locks: lock.release() time.sleep(5) logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s %(threadName)s] %(message)s', datefmt='%H:%M:%S') locks=[] N=5 for n in range(N): lock=threading.Lock() locks.append(lock) t=threading.Thread(target=f,args=(lock,n)) t.start() threading.Thread(target=h,args=(locks,)).start()