As Python was first developed 29 years ago, it is unsurprisingly that it was designed more as a linear programming language when single-core CPUs are still dominating the market. In fact, CPython developer may still be feeling the heat when it comes to concurrency. Luckily, we Python noobs can lay back and enjoy the fruits of PEP 371, where multiprocessing was officially added to the standard libraries back in 2008, and PEP 3156 where asyncio made its way to standard libraries in 2012. In the first part of the series on parallelism with Python, we are going to look into multithreading, how to implement it with multiprocessing.
Multi-threading vs Multi-processing
TL;DR: Parallelise a CPU-bound task with multiprocessing, and a I/O-bound task with multithreading
Thread is a separate flow of execution. When you have a pool of worker threads, they will be executing close-to concurrently. These threads will share a common data space, hence the concept of Global Interpretor Lock (GIL) is important when you try to multi-thread your python script. What GIL does is, in short limit one Python thread to run at a time to avoid inconsistent changes in shared memories, or in long is to create a thread-safe memory management environment for including C libraries that are not thread-safe into the Python ecosystem (or go read more on CPython, unfortunately this is out of my realm of knowledge). As such, threads will be locking the caller thread when they need to use the CPU for computations. This makes threads less efficient for CPU-bound tasks and more so for I/O-bound tasks, e.g. networking, issuing database operations, etc.
Process can be understood as a separate Python process that has been forked from the parent process and has its own Python interpretor. Because of that, each process will has its own GIL, and will not lock other processes out when executing on a CPU core. The price for avoiding the GIL bottleneck is to have a larger memory overhead as a copy of the address space, or copy-on-write if supported is needed for every process. Because of that, processes are usually more preferable when conducting CPU-bound tasks e.g. matrix manipulations.
Python’s Multithreading Implementations
Python’s standard library, multiprocessing has an interface for threading available via multiprocessing.pool.Pool. For seasoned Python veterans, threading was the original library for this. This interface provides the following functionalities, but each method has different restrictions on how arguments can be passed and without easy way for us to track progress:
apply(func[, args[, kwds]]) (This is implemented as apply_async( ... ).get())
Before we will dive into these methods, discuss the pros and cons, let’s set the scene first:
When running it with %time func(0), we have got the following results:
CPU times: user 89.6 ms, sys: 35.6 ms, total: 125 ms
Wall time: 127 ms
Let’s say we want to, for some reason, run it a hundred times:
n=100data= range(n)
# for loop
%%timeit -n 3for i in data:
func(i)
>>> 7.93 s ± 67.5msper loop (mean ± std. dev. of 7 runs, 3 loops each)
# for loop with tqdm
%%capture capture
%%timeit -n 3
for i in tqdm(data):
func(i)
print(capture.stdout.strip())
>>> 7.93 s ± 67.5msper loop (mean ± std. dev. of 7 runs, 3 loops each)
APPLY & APPLY_ASYNC
Pass a function to the pool of threads. This comes in two variants: .apply() and .apply_async(). When using the apply() method, you will need to pass a callable, together with some optional args and/or kwargs. When executing the function, it would block the calling thread until the result is ready or an exception has been raised. Hence, apply_async() is usually preferred.
When using apply_async(), instead of the actual result, it would return a multiprocessing.pool.ApplyResult which is essentially a promise of a result that you can obtain using multiprocessing.pool.ApplyResult.get() function. you can also pass in a callback and a error_callback function which will be executed when the thread has finished it job and when it has failed like it has always been in my code respectively.
Pros:
passing functions to the worker pool one by one allow better flexibility in scheduling
the only variant in Python’s implementation that supports both args and kwargs
Cons:
assigning functions one by one increasing network overhead in the form of inter-thread communications
apply will block the caller thread, and can easily be slower than the benchmark
depending on the specific function, using apply_async may have worse throughput as the payload cannot be passed as batches
APPLY
%%timeit -n 3
with Pool(processes=n_job) as pool:
results = list(pool.apply(func, args=[i]) for i in data)
>>> 11.1 s ± 901msper loop (mean ± std. dev. of 7 runs, 3 loops each)
APPLY + TQDM
%%capture capture
%%timeit -n 1
with Pool(processes=n_job) as pool:
results = list(tqdm((pool.apply(func, args=[i]) for i in data), total=n))
print(capture.stdout.strip())
>>> 11.3 s +- 1.19 s per loop (mean +- std. dev. of 7 runs, 1loopeach)
APPLY_ASYNC
%%timeit -n 10
withPool(processes=n_job) aspool:
results = list(pool.apply_async(func, args=(i, )) for i in data)
results = [r.get() for r in results]
>>> 3 s ± 159 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
APPLY_ASYNC + TQDM
%%capture capture
%%timeit -n 10
withPool(processes=n_job) aspool:
results = [pool.apply_async(func, args=(i, )) for i in data]
results = [r.get() for r in tqdm(results)]
print(capture.stdout.strip())
>>> 3.08 s +- 108msper loop (mean +- std. dev. of 7 runs, 10 loops each)
APPLY_ASYNC + TQDM in Callback
%%capturecapture
%%timeit -n 10
with tqdm(total=n) as pbar:
with Pool(processes=n_job) as pool:
def callback(*args):
# callback
pbar.update()
return
results = [
pool.apply_async(
func,
args=(i, ),
callback=callback) for i in data]
results = [r.get() for r in results]
print(capture.stdout.strip())
>>> 3.08 s +- 105msper loop (mean +- std. dev. of 7 runs, 10 loops each)
MAP & MAP_ASYNC
Arguments:
func: A callable for each of the individual thread to execute
iterable: An iterable of arg that will be passes to the function
chunksize: An optional positive integer (default 1) that specifies the (approximate) size of chunks, named tasks, that iterable will be chopped into. These tasks and will be assigned to the threads for executing the flow defined in func.
callback: (Only for map_async) An optional callable (default None) that will be called everytime when results have been returned.
error_callback: (Only for map_async) An optional callable (default None) that will be called everytime when an uncaught exception has been raised in func.
Returns:
A list of results
Pros:
No need to iterate through the list of data as this will be handled by map or map_async directly
chunksize allows better throughput
Order is preserved, i.e. order of execution is same as the order of output
A major disadvantage for apply and apply_async is that we will need to iterative execute pool.apply or the asynchronous variant for each of the set of args and/or kwargs. Un-optimised iteration is almost always nightmares when it comes to scalability. In this case, pool.map is our dreamcatcher. All we need to do is pass in the iterable, and viola.
Just like apply, map has an asynchronous variant which usually performs better as map would block the calling thread until results have been returned.
Another very handy argument is chunksize, which accepts a natural number with a default of 1. Our iterable will be splitted into sections, called tasks, of length roughly the length of chunksize. Each of the thread will be a task, which they would need to finish before they ask for a new task. In essense, chunksize == 1 gives you better flexibility on scheduling each task, while having a chunksize > 1 gives you (in general) better throughput and reduces number of inter-thread communications. A general rule of thumb is to have chunksize=1 if you do not know how long each task would take to finish (e.g. optimisation), and have chunksize=len(iterable) // n_job if you are expecting the tasks to finish in roughly the same time.
Cons:
map and map_async will convert the argument iterable into a list before execution, causing additional memory overhead
map will block the calling threads until results has been returned
map and map_async only allow one arg to be passed to the function, meaning that the other arguments (from the second onwards) should have a pre-defined value or a default value
Be careful of memory usage especially when the iterable gets longer
map and map_async also come with some disadvantages. One that is particularly annoying is that it only allows one arg. If your function accepts multiple args or kwargs, here are three ways (in general) to walk around it:
1. Just make it happen
Not recommended; not maintainable.
You will need to specify every argument even if they are optional, and may also need to wrap your target function to make it happen.
2. Partial Method
Only if the varying argument is the first argument accepted by the function.
>>> 3.2 s ± 131msper loop (mean ± std. dev. of 7 runs, 10 loops each)
IMAP & IMAP_UNORDERED
Arguments:
func: A callable for each of the individual thread to execute
iterable: An iterable of arg that will be passes to the function
chunksize: An optional positive integer (default 1) that specifies the (approximate) size of chunks, named tasks, that iterable will be chopped into. These tasks and will be assigned to the threads for executing the flow defined in func.
Returns:
A generator of results
Pros:
No need to iterate through the list of data as this will be handled by map or map_async directly
chunksize allows better throughput
Argument iterable will not be converted to list before passing to threads, avoiding stack overflow when iterable is a long, non-list iterable
Results will be kept as a generator instead of being converted into a list before returning
imap_unordered will return results as soon as it completes without guaranteeing preservation of order in the input iterable
imap is officially defined as a lazier version of map, meaning that it would be cast your input iterable into a list before chopping it into tasks, nor would it cast the results into list before returning. Instead it uses iter() and .next() to delegate tasks. Unlike map which would return a list of results or map_async which returns a promise of a result, imap and imap_unordered return results as soon as the worker threads yield results. Because of this difference, results cannot be casted into a list and instead would need to be in a generator, where users can use next() to fetch the latest results. This will be particularly if your program do not need to wait for all the results to start any post-processing. On top of that if order of execution is not important to you, imap_unordered would theoretically be better as it would yield results as soon as the execution is done regardless of the order of input iterable (i.e. if you pass in (1, 2, 3), you may get the results (3, 2, 1), with the result order completely determined by run time).
Just like map and map_async, imap and its variant also have chunksize, which accepts a natural number with a default of 1. A general rule of thumb is to have chunksize=1 if you do not know how long each task would take to finish (e.g. optimisation), and have chunksize=len(iterable) // n_job if you are expecting the tasks to finish in roughly the same time.
Cons:
You may be working with an incomplete set of results when interacting with the generator
All the convenience of list would not be available when handling the result
imap and imap_unordered only allow one arg to be passed to the function, meaning that the other arguments (from the second onwards) should have a pre-defined value or a default value
imap_unordered will return results as soon as it completes without guaranteeing preservation of order in the input iterable
Be careful of memory usage especially when the iterable gets longer
In short, don’t use imap and imap_unordered if you are not comfortable with generator or if you need to complete result before proceeding, and don't use imap_unordered if order is important to you. If you want to convert the generator back to a list, shoo, go use map and map_async instead.
To walk around the restriction of one arg, please refer to map and map_async.
>>> 3.19 s +- 173msper loop (mean +- std. dev. of 7 runs, 10 loops each)
STARMAP & STARMAP_ASYNC
Arguments:
func: A callable for each of the individual thread to execute
iterable: An iterable of iterable of arg(S) that will be passes to and unpacked by the function
chunksize: An optional positive integer (default 1) that specifies the (approximate) size of chunks, named tasks, that iterable will be chopped into. These tasks and will be assigned to the threads for executing the flow defined in func.
callback: (Only for starmap_async) An optional callable (default None) that will be called everytime when results have been returned.
error_callback: (Only for starmap_async) An optional callable (default None) that will be called everytime when an uncaught exception has been raised in func.
Returns:
A list of results
Pros:
Multiple args can be passed to func
chunksize allows better throughput
Order is preserved, i.e. order of execution is same as the order of output
starmap and starmap_async were introduced in Python 3.3 to address exactly the issue where multiple args cannot be easily passed to the target function. Instead of passing in an iterable of arg, we will need to pass in an iterable of iterable of args i.e. if we pass in [(0, 1), (1, 2)] into function f, it would execute f(0, 1), and f(1, 2). So remember to wrap your args into a tuple (memory does not grow on trees) even if it has only one arg (we don't judge). For cases where you will have a mix of constant arguments and varying arguments, there are two ways in general to treat it:
1. Partial Method
Only if the varying arguments are the first arguments accepted by the function.
2. Repeat
By defining all the arguments for each of the function call, this would work for varying argument at different position.
Cons:
starmap and starmap_async will convert the argument iterable into a list before execution, causing additional memory overhead
starmap will block the calling thread until results has been returned
Be careful of memory usage especially when the iterable gets longer
starmap and starmap_async do not support kwargs, so you either need to use the Repeat Method above for all the arguments, or there is nothing obvious in multiprocessing library that can help you
>>> 3.18 s ± 105msper loop (mean ± std. dev. of 7 runs, 10 loops each)
Final Leaderboard
From the table above, we can see that when used correctly, multithreading has been able to speed up our simple I/O heavy task. These numbers were obtained on an EC2 instance at its idle state, which means it will most likely be different from what you will get. That being said, apply and apply_async have been consistently outperforming the rest based on my experience and experiment, so if you would like to try multithreading, apply and apply_async should be your best shout.
In the next part of this series, we are going to look into why multiprocessing may not be good enough for our daily routine and also walk through some other alternatives that would hopefully help you speed up your code. Let me know if you have learnt something new from this! And please also let me know if there are other neat tricks that I have missed!