Tuesday, December 18

Thread Carefully: An Introduction To Concurrent Python

The ability to execute code in parallel is crucial in a wide variety of scenarios. Concurrent programming is a key asset for web servers, producer/consumer models, batch number-crunching and pretty much any time an application is bottlenecked by a resource.

It’s sadly the case that writing quality concurrent code can be a real headache, but this article aims to demonstrate how easy it is to get started writing threaded programs in Python. Due to the large number of modules available in the standard library which are there to help out with this kind of thing, it’s often the case that simple concurrent tasks are surprisingly quick to implement.

We’ll walk through the difference between threads and processes in a Python context, before reviewing some of the different approaches you can take and what they’re best suited for.

(Python 3 is used for the duration of the article.)

The Global Interpreter Lock

It’s impossible to talk about concurrent programming in Python without mentioning the Global Interpreter Lock, or GIL. This is because of the large impact it has on which approach you select when writing asynchronous Python. The most important thing to note is that it is only a feature of CPython (the widely used “reference” Python implementation), it’s not a feature of the language. Jython and IronPython, among other implementations, have no GIL.

The GIL is controversial because it only allows one thread at a time to access the Python interpreter. This means that it’s often not possible for threads to take advantage of multi-core systems. Note that if there are blocking operations which happen outside Python, long-wait tasks like I/O for instance, then the GIL is not a bottleneck and writing a threaded program will still be a benefit. However, if the blocking operations are largely crunching through CPython bytecode, then the GIL becomes a bottleneck.

Why was the GIL introduced at all? It makes memory management much simpler with no possibility of simultaneous access or race conditions, and it makes C extensions easier to write and easier to wrap.

The upshot of all this is that if you need true parallelism and need to leverage multi-core CPUs, threads won’t cut it and you need to use processes. A separate process means a separate interpreter with separate memory, its own GIL, and true parallelism. This guide will give examples of both thread and process architectures.

The concurrent.futures module

The concurrent.futures module is a well-kept secret in Python, but provides a uniquely simple way to implement threads and processes. For many basic applications, the easy to use Pool interface offered here is sufficient.

Here’s an example where we want to download some webpages, which will be much quicker if done in parallel.

"""Download webpages in threads."""
import requests
from concurrent.futures import ThreadPoolExecutor

download_list = [
    {'name': 'google', 'url': "http://google.com"},
    {'name': 'reddit', 'url': "http://reddit.com"},
    {'name': 'ebay', 'url': "http://ebay.com"},
    {'name': 'bbc', 'url': "http://bbc.co.uk"}
]


def download_page(page_info):
    """Download and save webpage."""
    r = requests.get(page_info['url'])
    with open(page_info['name'] + '.html', 'w') as save_file:
        save_file.write(r.text)


if __name__ == '__main__':
    pool = ThreadPoolExecutor(max_workers=10)

    for download in download_list:
        pool.submit(download_page, download)

Most of the code is just setting up our downloader example; it’s only the last block which contains the threading-specific code. Note how easy it is to create a dynamic pool of workers using ThreadPoolExecutor and submit a task. We could even simplify the last two lines to one using map:

    pool.map(download_page, download_list)

Using threads works well in this case since the blocking operation that benefits from concurrency is the act of fetching the webpage. This means that the GIL is not an issue and threading is an ideal solution. However, if the operation in question was something which was CPU intensive within Python, processes would likely be more appropriate because of the restrictions of the GIL. In that case, we could have simply switched out ThreadPoolExecutor with  ProcessPoolExecutor.

The threading module

Whilst the concurrent.futures module offers a great way to get off the ground quickly, sometimes more control is needed over different threads, which is where the ubiquitous threading module comes in.

Let’s re-implement the website downloader we made above, this time using the threading module.

"""Download webpages in threads, using `threading`."""
import requests
import time
import threading

download_list = [
    {'name': 'google', 'url': "http://google.com"},
    {'name': 'reddit', 'url': "http://reddit.com"},
    {'name': 'ebay', 'url': "http://ebay.com"},
    {'name': 'bbc', 'url': "http://bbc.co.uk"}
]


def status_update():
    """Print 'Still downloading' at regular intervals."""
    while True:
        print("Still downloading")
        time.sleep(0.1)


def download_page(page_info):
    """Download and save webpage."""
    r = requests.get(page_info['url'])
    with open(page_info['name'] + '.html', 'w') as save_file:
        save_file.write(r.text)

if __name__ == '__main__':
    for download in download_list:
        downloader = threading.Thread(target=download_page, args=(download,))
        downloader.start()

    status = threading.Thread(target=status_update)
    status.start()

For each thread we want to create, we make an instance of the threading.Thread class, specifying what we would like our worker function to be, and the arguments required.

Note that we’ve also added a status update thread. The purpose of this is to repeatedly print “Still downloading” until we’ve finished fetching all the web pages. Unfortunately, since Python waits for all threads to finish executing before it exits, the program will never exit and the status updater thread will never stop printing.

This is an example of when the threading module’s multitude of options could be useful: we can mark the updater thread as a daemon thread, which means that Python will exit when only daemon threads are left running.

    status = threading.Thread(target=status_update)
    status.daemon = True
    status.start()

The program now successfully stops printing and exits when all downloader threads are finished.

Daemon threads are generally most useful for background tasks and repetitive functions which are only required when the main program is running, since a daemon can be killed at any moment, causing data loss.

The combination of threading and queues

So far we’ve only looked at cases where we know exactly what we want the threads to be working on when we start them. However, it’s often the case that we need to start a group of worker threads, then feed them tasks as they arrive.

The best data structure for dealing with these tasks is, of course, a queue, and Python provides a queue module which is especially geared towards threading applications. FIFO, LIFO and priority queues are available.

Using a queue.Queue object to add, get, and mark tasks as done is as simple as this:

from queue import Queue

# maxsize=0 means infinite size limit
tasks = Queue(maxsize=0)

tasks.put("a task")
tasks.put("another task")

while not tasks.empty():
    print(tasks.get())
    # execute task
    tasks.task_done()

Ok, that’s pretty basic so far. Now let’s use it to create a tasks queue for our website downloader. We’ll create a group of worker threads which can all access the queue and wait for tasks to come in.

"""Download webpages in threads, using `threading` and `queue`."""
import requests
import threading
from queue import Queue

NUM_WORKER_THREADS = 3


def download_page(page_info):
    """Download and save webpage."""
    r = requests.get(page_info['url'])
    with open(page_info['name'] + '.html', 'w') as save_file:
        save_file.write(r.text)


def handle_tasks(tasks_queue):
    """Monitor tasks queue and execute tasks as appropriate."""
    while True:
        download_page(tasks_queue.get())
        tasks_queue.task_done()


if __name__ == '__main__':
    tasks = Queue(maxsize=0)

    # Create and start worker threads
    for i in range(NUM_WORKER_THREADS):
        worker = threading.Thread(target=handle_tasks, args=(tasks,))
        worker.daemon = True
        worker.start()

    # Add some tasks to the queue
    tasks.put({'name': 'google', 'url': "http://google.com"})
    tasks.put({'name': 'reddit', 'url': "http://reddit.com"})
    tasks.put({'name': 'ebay', 'url': "http://ebay.com"})
    tasks.put({'name': 'bbc', 'url': "http://bbc.co.uk"})

    tasks.join()

Note that in this example all the tasks were added in one go for the sake of brevity, but in a real application the tasks could trickle in at any rate. Here we exit the program when the tasks queue has been fully completed, using the .join() method.

The multiprocessing module

The threading module is great for detailed control of threads, but what if we want this finer level of control for processes? You might think that this would be more challenging since once a process is launched, it’s completely separate and independent – harder to control than a new thread which remains within the current interpreter and memory space.

Fortunately for us, the Python developers worked hard to create a multiprocessing module which has an interface that is almost identical to the threading module. This means that launching processes follows the exact same syntax as our examples above. Our simple downloader would become this:

"""Download webpages in threads, using `multiprocessing`."""
import requests
import time
import multiprocessing

download_list = [
    {'name': 'google', 'url': "http://google.com"},
    {'name': 'reddit', 'url': "http://reddit.com"},
    {'name': 'ebay', 'url': "http://ebay.com"},
    {'name': 'bbc', 'url': "http://bbc.co.uk"}
]


def status_update():
    """Print 'Still downloading' at regular intervals."""
    while True:
        print("Still downloading")
        time.sleep(0.1)


def download_page(page_info):
    """Download and save webpage."""
    r = requests.get(page_info['url'])
    with open(page_info['name'] + '.html', 'w') as save_file:
        save_file.write(r.text)


if __name__ == '__main__':
    for download in download_list:
        downloader = multiprocessing.Process(target=download_page,
                                             args=(download,))
        downloader.start()

    status = multiprocessing.Process(target=status_update)
    status.daemon = True
    status.start()

We think it’s awesome that Python manages to keep the same syntax between the threading and multiprocessing modules, when the action taking place under the hood is so different.

When it comes to distributing data between processes, the queue.Queue that we used for threading will not work between processes. This is because a queue.Queue is fundamentally just a data structure within the current process – albeit one which is cleverly locked and mutexed. Thankfully there exists a multiprocessing.Queue, which is specifically designed for inter-process communication. Behind the scenes, this will serialize your data and send it through a pipe between processes – a very convenient abstraction.

Summary

Writing concurrent code in Python can be a lot of fun due to the inbuilt language features that abstract away a lot of problems. This doesn’t mean that a detailed level of control cannot be achieved either, but rather that the barrier to getting started with simple tasks is lowered. So when you’re stuck waiting for one process to finish before starting the next, give one of these techniques a try.

No comments:

Post a Comment