The ability to execute code in parallel is crucial in a wide variety of scenarios. Concurrent programming is a key asset for web servers, producer/consumer models, batch number-crunching and pretty much any time an application is bottlenecked by a resource.
It’s sadly the case that writing quality concurrent code can be a real headache, but this article aims to demonstrate how easy it is to get started writing threaded programs in Python. Due to the large number of modules available in the standard library which are there to help out with this kind of thing, it’s often the case that simple concurrent tasks are surprisingly quick to implement.
We’ll walk through the difference between threads and processes in a Python context, before reviewing some of the different approaches you can take and what they’re best suited for.
(Python 3 is used for the duration of the article.)
The Global Interpreter Lock
It’s impossible to talk about concurrent programming in Python without mentioning the Global Interpreter Lock, or GIL. This is because of the large impact it has on which approach you select when writing asynchronous Python. The most important thing to note is that it is only a feature of CPython (the widely used “reference” Python implementation), it’s not a feature of the language. Jython and IronPython, among other implementations, have no GIL.
The GIL is controversial because it only allows one thread at a time to access the Python interpreter. This means that it’s often not possible for threads to take advantage of multi-core systems. Note that if there are blocking operations which happen outside Python, long-wait tasks like I/O for instance, then the GIL is not a bottleneck and writing a threaded program will still be a benefit. However, if the blocking operations are largely crunching through CPython bytecode, then the GIL becomes a bottleneck.
Why was the GIL introduced at all? It makes memory management much simpler with no possibility of simultaneous access or race conditions, and it makes C extensions easier to write and easier to wrap.
The upshot of all this is that if you need true parallelism and need to leverage multi-core CPUs, threads won’t cut it and you need to use processes. A separate process means a separate interpreter with separate memory, its own GIL, and true parallelism. This guide will give examples of both thread and process architectures.
The concurrent.futures module
The concurrent.futures
module is a well-kept secret in Python, but provides a uniquely simple way to implement threads and processes. For many basic applications, the easy to use Pool
interface offered here is sufficient.
Here’s an example where we want to download some webpages, which will be much quicker if done in parallel.
"""Download webpages in threads.""" import requests from concurrent.futures import ThreadPoolExecutor download_list = [ {'name': 'google', 'url': "http://google.com"}, {'name': 'reddit', 'url': "http://reddit.com"}, {'name': 'ebay', 'url': "http://ebay.com"}, {'name': 'bbc', 'url': "http://bbc.co.uk"} ] def download_page(page_info): """Download and save webpage.""" r = requests.get(page_info['url']) with open(page_info['name'] + '.html', 'w') as save_file: save_file.write(r.text) if __name__ == '__main__': pool = ThreadPoolExecutor(max_workers=10) for download in download_list: pool.submit(download_page, download)
Most of the code is just setting up our downloader example; it’s only the last block which contains the threading-specific code. Note how easy it is to create a dynamic pool of workers using ThreadPoolExecutor
and submit a task. We could even simplify the last two lines to one using map
:
pool.map(download_page, download_list)
Using threads works well in this case since the blocking operation that benefits from concurrency is the act of fetching the webpage. This means that the GIL is not an issue and threading is an ideal solution. However, if the operation in question was something which was CPU intensive within Python, processes would likely be more appropriate because of the restrictions of the GIL. In that case, we could have simply switched out ThreadPoolExecutor
with ProcessPoolExecutor
.
The threading module
Whilst the concurrent.futures
module offers a great way to get off the ground quickly, sometimes more control is needed over different threads, which is where the ubiquitous threading
module comes in.
Let’s re-implement the website downloader we made above, this time using the threading
module.
"""Download webpages in threads, using `threading`.""" import requests import time import threading download_list = [ {'name': 'google', 'url': "http://google.com"}, {'name': 'reddit', 'url': "http://reddit.com"}, {'name': 'ebay', 'url': "http://ebay.com"}, {'name': 'bbc', 'url': "http://bbc.co.uk"} ] def status_update(): """Print 'Still downloading' at regular intervals.""" while True: print("Still downloading") time.sleep(0.1) def download_page(page_info): """Download and save webpage.""" r = requests.get(page_info['url']) with open(page_info['name'] + '.html', 'w') as save_file: save_file.write(r.text) if __name__ == '__main__': for download in download_list: downloader = threading.Thread(target=download_page, args=(download,)) downloader.start() status = threading.Thread(target=status_update) status.start()
For each thread we want to create, we make an instance of the threading.Thread
class, specifying what we would like our worker function to be, and the arguments required.
Note that we’ve also added a status update thread. The purpose of this is to repeatedly print “Still downloading” until we’ve finished fetching all the web pages. Unfortunately, since Python waits for all threads to finish executing before it exits, the program will never exit and the status updater thread will never stop printing.
This is an example of when the threading
module’s multitude of options could be useful: we can mark the updater thread as a daemon thread, which means that Python will exit when only daemon threads are left running.
status = threading.Thread(target=status_update) status.daemon = True status.start()
The program now successfully stops printing and exits when all downloader threads are finished.
Daemon threads are generally most useful for background tasks and repetitive functions which are only required when the main program is running, since a daemon can be killed at any moment, causing data loss.
The combination of threading and queues
So far we’ve only looked at cases where we know exactly what we want the threads to be working on when we start them. However, it’s often the case that we need to start a group of worker threads, then feed them tasks as they arrive.
The best data structure for dealing with these tasks is, of course, a queue, and Python provides a queue module which is especially geared towards threading applications. FIFO, LIFO and priority queues are available.
Using a queue.Queue
object to add, get, and mark tasks as done is as simple as this:
from queue import Queue # maxsize=0 means infinite size limit tasks = Queue(maxsize=0) tasks.put("a task") tasks.put("another task") while not tasks.empty(): print(tasks.get()) # execute task tasks.task_done()
Ok, that’s pretty basic so far. Now let’s use it to create a tasks queue for our website downloader. We’ll create a group of worker threads which can all access the queue and wait for tasks to come in.
"""Download webpages in threads, using `threading` and `queue`.""" import requests import threading from queue import Queue NUM_WORKER_THREADS = 3 def download_page(page_info): """Download and save webpage.""" r = requests.get(page_info['url']) with open(page_info['name'] + '.html', 'w') as save_file: save_file.write(r.text) def handle_tasks(tasks_queue): """Monitor tasks queue and execute tasks as appropriate.""" while True: download_page(tasks_queue.get()) tasks_queue.task_done() if __name__ == '__main__': tasks = Queue(maxsize=0) # Create and start worker threads for i in range(NUM_WORKER_THREADS): worker = threading.Thread(target=handle_tasks, args=(tasks,)) worker.daemon = True worker.start() # Add some tasks to the queue tasks.put({'name': 'google', 'url': "http://google.com"}) tasks.put({'name': 'reddit', 'url': "http://reddit.com"}) tasks.put({'name': 'ebay', 'url': "http://ebay.com"}) tasks.put({'name': 'bbc', 'url': "http://bbc.co.uk"}) tasks.join()
Note that in this example all the tasks were added in one go for the sake of brevity, but in a real application the tasks could trickle in at any rate. Here we exit the program when the tasks queue has been fully completed, using the .join()
method.
The multiprocessing module
The threading
module is great for detailed control of threads, but what if we want this finer level of control for processes? You might think that this would be more challenging since once a process is launched, it’s completely separate and independent – harder to control than a new thread which remains within the current interpreter and memory space.
Fortunately for us, the Python developers worked hard to create a multiprocessing
module which has an interface that is almost identical to the threading
module. This means that launching processes follows the exact same syntax as our examples above. Our simple downloader would become this:
"""Download webpages in threads, using `multiprocessing`.""" import requests import time import multiprocessing download_list = [ {'name': 'google', 'url': "http://google.com"}, {'name': 'reddit', 'url': "http://reddit.com"}, {'name': 'ebay', 'url': "http://ebay.com"}, {'name': 'bbc', 'url': "http://bbc.co.uk"} ] def status_update(): """Print 'Still downloading' at regular intervals.""" while True: print("Still downloading") time.sleep(0.1) def download_page(page_info): """Download and save webpage.""" r = requests.get(page_info['url']) with open(page_info['name'] + '.html', 'w') as save_file: save_file.write(r.text) if __name__ == '__main__': for download in download_list: downloader = multiprocessing.Process(target=download_page, args=(download,)) downloader.start() status = multiprocessing.Process(target=status_update) status.daemon = True status.start()
We think it’s awesome that Python manages to keep the same syntax between the threading
and multiprocessing
modules, when the action taking place under the hood is so different.
When it comes to distributing data between processes, the queue.Queue
that we used for threading will not work between processes. This is because a queue.Queue
is fundamentally just a data structure within the current process – albeit one which is cleverly locked and mutexed. Thankfully there exists a multiprocessing.Queue
, which is specifically designed for inter-process communication. Behind the scenes, this will serialize your data and send it through a pipe between processes – a very convenient abstraction.
Summary
Writing concurrent code in Python can be a lot of fun due to the inbuilt language features that abstract away a lot of problems. This doesn’t mean that a detailed level of control cannot be achieved either, but rather that the barrier to getting started with simple tasks is lowered. So when you’re stuck waiting for one process to finish before starting the next, give one of these techniques a try.
No comments:
Post a Comment