Today, we will explore the producer-consumer paradigm in multi-threaded programming a bit further to develop an interesting program - a simple crawler that wil try and get your favourite movies information from IMDB .

Problem

Implement two thread classes that use producer-consumer pattern to fetch movie information from IMDB - starting from a given URL and then going on to fetch related movies and their information.

Description

If you have ever tried to look for movie information on the web, you will know that IMDB is the gold-mine for information about movies. When you visit a movie’s page in IMDB, it provides a list of related movies in a section titled as “More Like This”. We will use this structure of an IMDB page to query such related movies to build a mini imdb crawler .

The IMDB Fetcher Thread

The fetcher thread gets a movie’s IMDB web-page data, given the URL.

import requests
import time
import threading

class IMDBFetcher(threading.Thread):
    """ IMDB data fetcher thread """
    
    def __init__(self, url_q, data_q):
        self.url_q = url_q
        self.data_q = data_q
        super().__init__(group=None)

    def run(self):
        
        while True:
            url = self.url_q.get()
            if url == None:
                print('Quitting')
                break
                
            data = requests.get(url).content
            data = data.decode('utf-8')
            print('Fetched {} bytes from {}'.format(len(data), url))
            self.url_q.task_done()
            # Post to data queue 
            self.data_q.put((url, data))
            time.sleep(2)

        print(self,'quitting')

This class is fairly simple. This is what it does.

  1. Accepts two queues in its initializer - a URL queue and a Data queue.
  2. Gets URLs from the first queue, downloads the data and posts the data to the second queue.

Any sizeable problem in computing involves I/O and processing. In this case the task of fetching URL data from the web is the I/O task. This is done by the Fetcher class. The task of processing this data and fetching the structured information will be done by another thread - which we will call as IMBDParser.

import time
import threading
import urllib.parse as urlparse
from bs4 import BeautifulSoup

class IMDBParser(threading.Thread):
    """ IMDB data parser and URL extractor thread """
    
    def __init__(self, url_q, data_q, limit=10):
        self.url_q = url_q
        self.data_q = data_q
        self.count = 0
        self.limit = limit
        # To avoid duplicates
        self.url_dict = {}
        super().__init__(group=None)
        
    def run(self):

        while self.count < self.limit:
            # Fetch data from Data queue
            url, data = self.data_q.get()
            soup = BeautifulSoup(data, "lxml")
            title = soup.find('h1').text.strip()
            
            self.count += 1
            # Parse and print movie title
            print('{}. {}, title: {}'.format(self.count, url, title))

            # Get related movies information
            recs = soup.find('div',attrs={'class': 'rec_page'})
            recs_items = recs.findAll('a')
            # Build absolute URLs
            urls = [urlparse.urljoin('https://imdb.com',link['href']) for link in recs_items]
            # Push URLs to URL queue
            for url in urls:
                if url in self.url_dict: continue
                self.url_q.put(url)
                self.url_dict[url] = 1

            time.sleep(2)

        print(self,'quitting')
        # Poison pill
        self.url_q.put(None)

The code comments should clarify what this class does. If not,

  1. This thread class accepts the same queues as the Fetcher thread.
  2. It waits for IMDB URL data on the data queue. The data is parsed by using the BeautifulSoup library and information such as title is extracted and printed.
  3. It also parses out related movie URLs from the data and posts the URLs back to the URL queue.

Information Flow and the Poison Pill

Let us take a brief look at the design of our classes because we have two queues here.

  1. URL Queue - Parser pushes data to this queue and fetcher gets data from here. So the Parser class is this queue’s producer and Fetcher is its consumer.
  2. Data Queue - Fetcher pushes data to this queue and Parser pops it from there. So the roles reverse here. Fetcher is this queue’s producer and Parser is the consumer.

The design creates a circular feedback loop between the two threads via the queues. This means as long as IMDB keeps providing related movies for a URL, we will keep on crawling without an end - till perhaps we circle back to some of the earlier URLs. We need a way to limit the crawl.

The Parser accepts a limit parameter for this. This limits the number of movies it processes to a hard limit. The parser quits its loop after this limit is reached.

Take a look at how the Parser class pushes $None$ into the URL queue as a magic value just before it quits. This is a special value placed by the producer of this queue to let know the consumer that this is the Endgame for the threads. The consumer will read this value and stop its processing. In our case, the Fetcher class breaks its loop if it sees that the url is $None$. This technique is known as the poison-pill.

Lights, Camera, Action

Let us get our actors to act out now. They have been prepared well for their respective roles and should play the part.

To start things off,

import queue

url_q = queue.Queue()
data_q = queue.Queue()

parser = IMDBParser(url_q, data_q)
fetcher = IMDBFetcher(url_q, data_q)

fetcher.start()
parser.start()

# Push a URL into URL queue
url_q.put('https://www.imdb.com/title/tt4154756/?ref_=ttls_li_tt')

# Wait for the threads to finish acting
parser.join()
fetcher.join()

And, action!

Fetched 261926 bytes from https://www.imdb.com/title/tt4154756/?ref_=ttls_li_tt
1. https://www.imdb.com/title/tt4154756/?ref_=ttls_li_tt, title: Avengers: Infinity War (2018)
Fetched 255123 bytes from https://imdb.com/title/tt3501632/
2. https://imdb.com/title/tt3501632/, title: Thor: Ragnarok (2017)
Fetched 257946 bytes from https://imdb.com/title/tt2395427/
3. https://imdb.com/title/tt2395427/, title: Avengers: Age of Ultron (2015)
Fetched 264643 bytes from https://imdb.com/title/tt0848228/
4. https://imdb.com/title/tt0848228/, title: The Avengers (2012)
Fetched 264697 bytes from https://imdb.com/title/tt3498820/
5. https://imdb.com/title/tt3498820/, title: Captain America: Civil War (2016)
Fetched 260896 bytes from https://imdb.com/title/tt4154796/
6. https://imdb.com/title/tt4154796/, title: Avengers: Endgame (2019)
Fetched 243967 bytes from https://imdb.com/title/tt1825683/
7. https://imdb.com/title/tt1825683/, title: Black Panther (2018)
Fetched 243919 bytes from https://imdb.com/title/tt2250912/
8. https://imdb.com/title/tt2250912/, title: Spider-Man: Homecoming (2017)
Fetched 246522 bytes from https://imdb.com/title/tt3896198/
9. https://imdb.com/title/tt3896198/, title: Guardians of the Galaxy Vol. 2 (2017)
Fetched 262719 bytes from https://imdb.com/title/tt2015381/
10. https://imdb.com/title/tt2015381/, title: Guardians of the Galaxy (2014)
<IMDBParser(Thread-1, started 140005982398208)> quitting
Fetched 260695 bytes from https://imdb.com/title/tt0458339/
Fetched 258911 bytes from https://imdb.com/title/tt1843866/
Fetched 243679 bytes from https://imdb.com/title/tt0371746/
Fetched 261960 bytes from https://imdb.com/title/tt4154756/
Fetched 257067 bytes from https://imdb.com/title/tt1211837/
Quitting
<IMDBFetcher(Thread-2, started 140005990790912)> quitting

That worked well.

Fetcher Lag

If you notice the last few lines you will see,

Fetched 260695 bytes from https://imdb.com/title/tt0458339/
Fetched 258911 bytes from https://imdb.com/title/tt1843866/
Fetched 243679 bytes from https://imdb.com/title/tt0371746/
Fetched 261960 bytes from https://imdb.com/title/tt4154756/
Fetched 257067 bytes from https://imdb.com/title/tt1211837/
Quitting
<IMDBFetcher(Thread-2, started 140005990790912)> quitting

The Fetcher thread went on to fetch URLs for about five more movies which were unprocessed - as the limit had reached and Parser has quit already. This is because Parser always produces more data than Fetcher handles in one iteration of the loop. So the Fetcher takes time to bite on the poison-pill as there are always some URLs left in the queue till the poison-pill is reached.

Is there a way to make Fetcher break character faster ? Yes, one can use Events for this.

Event - The Game Changer

An Event object allows to signal from one thread to a waiting thread. An Event has two states - one in which the internal flag is $set$ and the other in which it is $clear$ed. In the latter state, any thread waiting on the event will block till it is signaled by another thread which resets the event to the $set$ state.

Here our thread actors have rewritten their act to use an Event to change their Endgame.

The Fetcher.

class IMDBFetcher(threading.Thread):
    """ IMDB data fetcher thread """
    
    def __init__(self, url_q, data_q, q_event):
        self.url_q = url_q
        self.data_q = data_q
        self.q_event = q_event
        super().__init__(group=None)

    def run(self):
        
        while True:
            # If times out break
            if not self.q_event.wait(timeout=10):
                print(self,'timed out, quitting')
                break

            url = self.url_q.get()
            data = requests.get(url).content
            data = data.decode('utf-8')
            print('Fetched {} bytes from {}'.format(len(data), url))
            self.url_q.task_done()
            # Post to data queue 
            self.data_q.put((url, data))
            time.sleep(2)               

        print(self,'quitting')          

The Parser.

class IMDBParser(threading.Thread):
    """ IMDB data parser and URL extractor thread """
    
    def __init__(self, url_q, data_q, q_event, limit=10):
        self.url_q = url_q
        self.data_q = data_q
        self.q_event = q_event
        self.count = 0
        self.limit = limit
        # To avoid duplicates
        self.url_dict = {}
        super().__init__(group=None)
        
    def run(self):

        while self.count < self.limit:
            self.q_event.clear()
            url, data = self.data_q.get()
            soup = BeautifulSoup(data, "lxml")
            title = soup.find('h1').text.strip()
            
            self.count += 1
            print('{}. {}, title: {}'.format(self.count, url, title))
            
            recs = soup.find('div',attrs={'class': 'rec_page'})
            recs_items = recs.findAll('a')
            urls = [urlparse.urljoin('https://imdb.com',link['href']) for link in recs_items]
            for url in urls:
                if url in self.url_dict: continue
                self.url_q.put(url)
                self.url_dict[url] = 1

            # Wake the thread Up
            self.q_event.set()
            time.sleep(2)

        self.q_event.clear()
        print(self,'quitting')

Hopefully the logic is clear (unlike some movies). If not,

  1. Fetcher always blocks on an event before its waits on the URL queue with a timeout. If it times out, then it breaks its loop and ends its game.
  2. Parser clears the event in the beginning of its iteration and sets it at the end of it after it has pushed URLs to the URL queue, indicating the Fetcher that everything is set - quite literally of course.
  3. Parser clears the event as its final act before calling it quits. This causes the event to time out signalling Fetcher to quit its loop early.

And thence to action …

url_q = queue.Queue()
data_q = queue.Queue()
event = threading.Event()

# Set event to kick-off action
event.set()

parser = IMDBParser(url_q, data_q, event)
fetcher = IMDBFetcher(url_q, data_q, event)

# Action!
fetcher.start()
parser.start()

# Push a URL into URL queue
url_q.put('https://www.imdb.com/title/tt4154756/?ref_=ttls_li_tt')

# Endgame
parser.join()
fetcher.join()

The results.

Fetched 262137 bytes from https://www.imdb.com/title/tt4154756/?ref_=ttls_li_tt
1. https://www.imdb.com/title/tt4154756/?ref_=ttls_li_tt, title: Avengers: Infinity War (2018)
Fetched 250292 bytes from https://imdb.com/title/tt3501632/
2. https://imdb.com/title/tt3501632/, title: Thor: Ragnarok (2017)
Fetched 262903 bytes from https://imdb.com/title/tt2395427/
3. https://imdb.com/title/tt2395427/, title: Avengers: Age of Ultron (2015)
Fetched 256625 bytes from https://imdb.com/title/tt0848228/
4. https://imdb.com/title/tt0848228/, title: The Avengers (2012)
Fetched 262751 bytes from https://imdb.com/title/tt3498820/
5. https://imdb.com/title/tt3498820/, title: Captain America: Civil War (2016)
Fetched 260984 bytes from https://imdb.com/title/tt4154796/
6. https://imdb.com/title/tt4154796/, title: Avengers: Endgame (2019)
Fetched 246792 bytes from https://imdb.com/title/tt1825683/
7. https://imdb.com/title/tt1825683/, title: Black Panther (2018)
Fetched 260492 bytes from https://imdb.com/title/tt2250912/
8. https://imdb.com/title/tt2250912/, title: Spider-Man: Homecoming (2017)
Fetched 259301 bytes from https://imdb.com/title/tt3896198/
9. https://imdb.com/title/tt3896198/, title: Guardians of the Galaxy Vol. 2 (2017)
Fetched 259216 bytes from https://imdb.com/title/tt2015381/
10. https://imdb.com/title/tt2015381/, title: Guardians of the Galaxy (2014)
<IMDBParser(Thread-1, started 140655438423808)> quitting
Fetched 264795 bytes from https://imdb.com/title/tt0458339/
<IMDBFetcher(Thread-2, started 140655446816512)> timed out, quitting
<IMDBFetcher(Thread-2, started 140655446816512)> quitting

You can see that the Fetcher just fetched one extra URL before quitting. Much better!

The relative sleep times of the threads should be matched carefully to make sure the thread does not time out too early or too late. See how the threads sleep a bit after processing (2 seconds). The wait timeout on the Event should always be a factor higher than this value to avoid early timeout for instance.

And cut.


Note that name and e-mail are required for posting comments