In the last article, you got an introduction to programming using threads in Python. In today’s article, we will discuss how to synchronize multiple threads using some synchronization primitives.

The recipes are,

  1. Synchronizing threads using a lock
  2. Synchronization of threads using condition objects
  3. Managing resources using a semaphore
  4. Using a queue to synchronize and share data between threads

1. Synchronizing threads using Lock

The $Lock$ is the most primitive synchronization primitive in Python (pun intended). It can be used to manage sequential or time based dependency between multiple threads.

Let us take a simple example, to illustrate this.

class NumberThread(threading.Thread):
    """ A thread generating sequence of numbers """
    
    def __init__(self, start=1, diff=2, end=50, num_list=[]):
        self._start = start
        self._end = end
        self._diff = 2
        self._numlist = num_list
        super().__init__(group=None)
        
    def run(self):

        for i in range(self._start, self._end, self._diff):
            print('[{}] - {}'.format(self.name, i))
            self._numlist.append(i)

The above class generates a sequence of numbers and appends it to the local list _numlist. Given, no arguments it generates the list of odd numbers till 50.

Let us run this, trying to generate list of odd and even numbers till 50 using two threads. We want the final sequence in order.

>>> num_list = []
>>> t1 = NumberThread(num_list=num_list)
>>> t2 = NumberThread(start=2, num_list=num_list)
>>> t1.start();t2.start()
[Thread-1] - 1
[Thread-1] - 3
[Thread-1] - 5
[Thread-1] - 7
[Thread-2] - 2
[Thread-1] - 9
>>> [Thread-1] - 11
[Thread-2] - 4
[Thread-1] - 13
[Thread-2] - 6
[Thread-1] - 15      
...

You can see the messages are getting printed in no particular order as threads 1 and 2 go about their work. Let’s see the final list. (extra line breaks added for visibility).

>>> num_list
[1, 3, 5, 7, 9, 2, 11, 4, 13, 6, 15, 8, 17, 10, 19, 12, 21, 14, 23, 16, 25, 18,
27, 20, 29, 22, 31, 24, 33, 26, 35, 28, 37, 30, 39, 32, 41, 34, 43, 36, 45, 38,
47, 40, 49, 42, 44, 46, 48] 

The list is in no particular order as the threads were not synchronized. If we want the data to be produced in order (odd numbers followed by even) we need to synchronize the threads.

This is where a $lock$ is helpful. Let us add an extra argument to the _init_ method to accept a lock. We will also modify the $run$ method to acquire and release this lock outside the for loop.

Our modified NumberThread class looks like,

class NumberThread(threading.Thread):
    """ A thread generating sequence of numbers """
    
    def __init__(self, lock, start=1, diff=2, end=50, num_list=[]):
        self._start = start
        self._end = end
        self._diff = 2
        self._numlist = num_list
        self._lock = lock
        super().__init__(group=None)
        
    def run(self):

        with self._lock:
            for i in range(self._start, self._end, self._diff):
                print('[{}] - {}'.format(self.name, i))
                self._numlist.append(i)

A quick tour.

  1. The thread accepts a new lock argument - which will be a $threading.Lock()$ object that we will pass from outside.
  2. The lock object in Python implements the context-management protocol. Hence you can enclose blocks of code inside a $with$ $lock$ statement which will automatically acquire and release the lock around the code. This synchronizes this block of code across threads.

After this change,

>>> num_list = []
>>> lock = threading.Lock()
>>> t1 = NumberThread(num_list=num_list, lock=lock)
>>> t2 = NumberThread(start=2, num_list=num_list, lock=lock)
>>> t1.start();t2.start()
[Thread-1] - 1
[Thread-1] - 3
[Thread-1] - 5
[Thread-1] - 7
[Thread-1] - 9
>>> [Thread-1] - 11
[Thread-1] - 13
[Thread-1] - 15
[Thread-1] - 17
[Thread-1] - 19
[Thread-1] - 21
n...
>>> num_list
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41,
43, 45, 47, 49, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34,
36, 38, 40, 42, 44, 46, 48]

The threads are now synchronized and the resulting list is in the expected order.

Locks however can manage only the most basic synchronization conditions - as we cannot have fine-grained control on when a lock may be acquired by a thread. For more fine-grained control and thread wake-up semantics, the Condition object is useful.

2. Synchronization among multiple threads using Condition objects

Apparently the following is one of the interview problems related to concurrency that is asked in software company interviews. The problem is stated as below.

There are 3 threads. The first one generates and prints the sequence

0,3,6, ...

The second one prints,

1,4,7, ...

And the third one,

2,5,8, ... 

Write code to create the threads and run them and make sure they print

0,1,2,3,4,5,... in that order.

It is pretty easy to write the code which generates the sequences. Here is the code.

import time

class Thread1(threading.Thread):

    def run(self):
        # Print 0,3,6...
        for i in range(0, 100, 3):
            print('[{}] - {}'.format(self.name, i))
            time.sleep(1)

class Thread2(threading.Thread):
        
    def run(self):
        # Print 1,4,7...
        for i in range(1, 100, 3):
            print('[{}] - {}'.format(self.name, i))
            time.sleep(1)

class Thread3(threading.Thread):

    def run(self):
        # Print 2,5,8...
        for i in range(2, 100, 3):
            print('[{}] - {}'.format(self.name, i))
            time.sleep(1)

However once you start the threads, you will see that the output is in jumbled up order after the first few numbers.

[Thread-1] - 0
[Thread-2] - 1
[Thread-3] - 2
[Thread-2] - 4
[Thread-1] - 3
[Thread-3] - 5
[Thread-2] - 7
[Thread-1] - 6
[Thread-3] - 8
[Thread-2] - 10
[Thread-1] - 9
[Thread-3] - 11

We cannot use a lock here as we need fine-grained synchronization inside the for loop - not outside it.

A Condition object encompasses its own lock and provides congruent methods. However on top, it provides wake-up and notify semantics whereby we can specifically wake-up a thread waiting on the condition from another thread.

Let us analyze the problem and sketch a solution before showing the code.

  1. We need threads 1,2 and 3 to be synchronized in the order 1,2,3,1,2,3,1 … .
  2. Thread1 should print and notify Thread2 which is waiting on it, Thread2 should print and notify Thread3 which is waiting on it, Thread3 should print and notify Thread1 which is waiting on it. And so on.

This can be done with 3 condition objects each signaling the other in a circular loop. Here is the code.

class Thread1(threading.Thread):

    def run(self):
        # Print 0,3,6...
        for i in range(0, 100, 3):
            with cond3:
                cond3.wait()
            print('[{}] - {}'.format(self.name, i))
            time.sleep(1)
            # Thread2 waiting on this
            with cond1:
                cond1.notify()

class Thread2(threading.Thread):

    def run(self):
        # Print 1,4,7...
        for i in range(1, 100, 3):
            with cond1:
                cond1.wait()
            print('[{}] - {}'.format(self.name, i))
            time.sleep(1)
            # Thread3 waiting on this            
            with cond2:
                cond2.notify()

class Thread3(threading.Thread):
        
    def run(self):
        # Print 2,5,8...
        for i in range(2, 100, 3):
            with cond2:
                cond2.wait()
            print('[{}] - {}'.format(self.name, i))
            time.sleep(1)
            # Thread1 waiting on this                        
            with cond3:
                cond3.notify()

if __name__ == "__main__":
    cond1 = threading.Condition()
    cond2 = threading.Condition()
    cond3 = threading.Condition()

    t1 = Thread1()
    t2 = Thread2()
    t3 = Thread3()

    t1.start()
    t2.start()
    t3.start()

    with cond3:
        cond3.notify()
NOTE: We are using globals here to make the code easier to read and understand. It is never a good idea to use globals in your Python code.

A brief explanation.

  1. We create 3 condition objects and create a circular dependency between the 3 threads using them.
  2. This enforces the sequential logic of threads - 1,2,3,1,2,3…
  3. To kick-start the threads, the main thread notifies cond3 on which Thread1 is waiting.

Here is the output of the modified code - with the numbers in the correct order.

[Thread-1] - 0
[Thread-2] - 1
[Thread-3] - 2
[Thread-1] - 3
[Thread-2] - 4
[Thread-3] - 5
[Thread-1] - 6
[Thread-2] - 7
[Thread-3] - 8
[Thread-1] - 9
[Thread-2] - 10
[Thread-3] - 11
...

3. Managing resources using a Semaphore

So far we have looked at examples involving thread synchronization with respect to time or enforcing a sequential or specific order among them. The other class of synchronization problems in computing involve shared access to resources which can be limited or unlimited.

In the following problem we look at one such example.

The problem involves implementing two threads, one a watcher thread observing changes in a given directory - such as file or directory creation/deletion or modification and a second info thread which prints the information to the standard output.

Here is the code for the WatcherThread class.

import os
import threading
import time
import operator

class WatcherThread(threading.Thread):
    """ Thread watching for changes in current folder """

    def __init__(self, paths, sema):
        self._paths = paths
        self._sema  = sema
        super().__init__(group=None)
        
    def run(self):

        # Dictionary mapping names to modified times
        first = {item:os.path.getmtime(item) for item in os.listdir('.')}
        
        while True:
            now = {item:os.path.getmtime(item) for item in os.listdir('.')}         
            if now != first:
                added = [item for item in now if item not in first]
                removed = [item for item in first if not item in now]
                if len(added):
                    for i in added:
                        self._paths.append((i, 'added'))
                if len(removed):
                    for i in removed:
                        self._paths.append((i, 'removed'))

                if len(added) == 0 and len(removed) == 0:
                    # Maybe something is modified ?
                    recent = sorted(now.items(), key=operator.itemgetter(1))[-1][0]
                    self._paths.append((recent, 'modified'))
                            
                # Release semaphore
                self._sema.release()

                # Switch the state
                first = now
                
            time.sleep(1)

A Semaphore keeps an internal counter which can never go below zero. The counter is decremented by acquiring the semaphore and incremented by releasing it. It can be used to manage control of limited resources by multiple threads.

The WatcherThread observes the current directory for any changes - by keeping a first and now dictionary of its contents. It adds the changed file/directory names to a list and then releases the semaphore _sema, after pushing the data to a shared list named _paths.

Here is the code for the InfoThread class which prints the information that is sent by the WatcherThread. It shares the same semaphore and list as the watcher thread.

class InfoThread(threading.Thread):
    """ Thread printing information about recently modified files """

    def __init__(self, paths, sema):
        self._paths = paths
        self._sema = sema
        super().__init__(group=None)
        
    def run(self):
        while True:
            # Wait on semaphore
            self._sema.acquire()
            while len(self._paths):
                item,action = self._paths.pop()
                print(action, '=>', item)

Here is how to start both threads and get things going.

paths = []
sema = threading.Semaphore(0)
t1 = WatcherThread(paths, sema)
t2 = InfoThread(paths, sema)
    
t1.start()
t2.start()

A quick tour of the code.

  1. We initialize the semaphore to 0. This is shared by both the threads. The list named paths which will hold the changed file/directory information is shared as well.
  2. The InfoThread will block immediately on the semaphore after it calls acquire on it as the semaphore value is zero.
  3. The WatcherThread releases the semaphore after it appends changed paths to the paths list. This increments the semaphore count to 1. InfoThread is now unblocked and it prints information on each path till the list is empty and goes back to waiting on the semaphore. The semaphore value goes back to 0 and the process repeats.

Here is the code in action showing some actions on the shell and the corresponding message by the threads.

$ echo "Testing" > x
$ cp x y                           
$ mv y z  
$ mkdir sub                        
$ cp z sub                         
$ rm -f z                          
added => x
added => y
removed => y
added => z
added => sub
modified => sub
removed => z

Astute readers may have observed that the code will run fine even if we don’t use a semaphore. In this case the InfoThread will run in a tight loop, checking on the list length continuously in a busy-wait fashion. However such busy-wait loops often occupy the CPU and it’s better to use blocking primitives to avoid this.

NOTE: You can test this by commenting out the semaphore release and acquire lines of code and watch your system's CPU kick into high gear.

4. Using a queue to synchronize and share data between threads

The queue module in Python provides an interface to thread-safe queues which provide advanced synchronisation and data sharing between multiple threads. The simplest class provided by this module is the Queue which provides a queue with FIFO semantics.

Queues are ideal when threads work in a producer-consumer pattern - where one type of thread(s) produce data and the other type consume it. They come with built-in blocking and release semantics so usually you don’t need other synchronization primitives when using queues.

Since our watcher and info threads work like a producer and consumer respectively, it is best suited for rewrite using a queue.

class WatcherThread(threading.Thread):
    """ Thread watching for changes in current folder """

    def __init__(self, q):
        self._q = q
        super().__init__(group=None)
        
    def run(self):

        # Dictionary mapping names to modified times
        first = {item:os.path.getmtime(item) for item in os.listdir('.')}
        
        while True:
            now = {item:os.path.getmtime(item) for item in os.listdir('.')}         
            if now != first:
                added = [item for item in now if item not in first]
                removed = [item for item in first if not item in now]
                if len(added):
                    for i in added:
                        self._q.put((i, 'added'))
                if len(removed):
                    for i in removed:
                        self._q.put((i, 'removed'))

                if len(added) == 0 and len(removed) == 0:
                    recent = sorted(now.items(), key=operator.itemgetter(1))[-1][0]
                    self._q.put((recent, 'modified'))
                            
                first = now
                
            time.sleep(1)

class InfoThread(threading.Thread):
    """ Thread printing information about recently modified files """

    def __init__(self, q):
        self._q = q
        super().__init__(group=None)
        
    def run(self):
        while True:
            # Block on the queue and get data
            item, action = self._q.get()
            print(action, '=>', item)

A quick run through the code.

  1. We replace both the shared list and the semaphore with a queue object, which is shared by both threads.
  2. WatcherThread acts as producer, putting data to the queue and InfoThread as consumer, getting data from it. Since the get call blocks when the queue is empty, there is no need for another synchronization construct.

This is how we start off things.

import queue

file_q = queue.Queue()
t1 = WatcherThread(file_q)
t2 = InfoThread(file_q)
    
t1.start()
t2.start()

You can verify that this runs exactly like the previous version by trying it out. However the code is much simpler as we were able to replace both the semaphore and the list by using the queue. Queue semantics work great for this problem.


Note that name and e-mail are required for posting comments