The provided content discusses error handling and task cancellation in Python's asyncio library, emphasizing the importance of robust exception management and the strategic use of task cancellation to maintain system performance and integrity.
Abstract
The article delves into advanced patterns for managing errors and cancellations within asyncio coroutines. It begins by acknowledging the power of asyncio for asynchronous programming but quickly shifts focus to the realities of dealing with errors in such systems. The author illustrates how unhandled exceptions in tasks can go unnoticed when using ensure_future and stresses the necessity of proper exception handling to avoid silent failures. The discussion includes practical examples of handling exceptions within periodic coroutines and the use of Future.result() to retrieve task results, which can raise exceptions if the task failed. The article also explores the implications of raising and not raising exceptions, and how to exit the event loop cleanly upon encountering errors. The author introduces the gather function's return_exceptions argument, which allows for the collection of exceptions alongside successful results, thus preventing the immediate propagation of exceptions. Furthermore, the article compares asyncio.gather with asyncio.wait, highlighting their differences in handling task results and the control they offer over task completion conditions. The concept of cancelling tasks is presented as a means to prevent overloading services, such as APIs, by sending too many requests. The article concludes with a discussion on handling task cancellation by catching CancelledError exceptions to perform necessary cleanup. The author wraps up by acknowledging the complexities of asyncio but also celebrates its role in advancing Python's capabilities in asynchronous programming and its growing ecosystem.
Opinions
The author believes that asyncio is a powerful tool for asynchronous programming in Python but requires careful handling of exceptions to prevent system failures.
It is implied that developers should be cautious when using ensure_future as it can lead to unnoticed exceptions, potentially causing silent failures in the system.
The article suggests that proper exception handling is not just about catching exceptions but also about logging them appropriately and acting on them, such as stopping the event loop if necessary.
The author emphasizes the importance of being able to retrieve results from tasks that have completed successfully even when some tasks have raised exceptions.
There is an opinion that developers should be mindful of the rate at which they send requests to external services to avoid overwhelming them, and that task cancellation is a key strategy in maintaining a healthy interaction with these services.
The author seems to appreciate the elegance of Python's exception handling mechanism, even in the context of asynchronous task cancellation, as it aligns with the language's design philosophy.
Despite acknowledging its flaws and complexities, the author is optimistic about the future of asyncio and its impact on the Python community, noting the continuous improvements and the rich ecosystem that has developed around the library.
Asyncio Coroutine Patterns: Errors and cancellation
This is the second part of a two part series on coroutine patterns in asyncio, to fully benefit from this article please read the first installment:Asyncio Coroutine Patterns: Beyond await
In the first part of this series we concluded that asyncio is awesome, coroutines are awesome and our code is awesome. But sometimes the outside world is not as awesome and we have to deal with it.
Now, for this second part of the series, I’ll run over the options asyncio gives us to handle errors when using these patterns as well as cancelling tasks so as to make our asynchronous systems robust and performant.
If you are really new to asyncio I recommend having a read through my very first article on the subject, Asyncio for the Working Python Developer, before diving into this series.
I will continue using the Hacker News API and also be using the async/await syntax introduced Python 3.5+. All example code is also available in the Github repo for this series.
Let’s get to it!
Error handling
If you recall at the end of the previous article we had a lovely system using periodic task that fetched the top stories in HN and recursively calculates the number of comments for each of them. Here’s the full listing:
Notice how URLFetcher raises a BoomException after a set number of fetches and there’s no exception handling anywhere in the code.
Here’s our periodic coroutine one more time:
When we run the script we get something like:
[16:13:16] Calculating comments for top5 stories. (1)
[16:13:16] Waiting for 5 seconds...
[16:13:17] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe...ion('BOOM!',)>) at 01_error_handling.py:126
handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe...ion('BOOM!',)>) at 01_error_handling.py:126>
Traceback (most recent call last):
File "/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py", line 126, in _run
self._callback(*self._args)
File "01_error_handling.py", line 127, in callback
fetch_count = fut.result()
File "01_error_handling.py", line 104, in get_comments_of_top_stories
results = await asyncio.gather(*tasks)
File "01_error_handling.py", line 71, in post_number_of_comments
response = await fetcher.fetch(session, url)
File "01_error_handling.py", line 60, in fetch
raise BoomException('BOOM!')
BoomException: BOOM!
[16:13:21] Calculating comments for top 5 stories. (2)
[16:13:21] Waiting for 5 seconds...
...
See how the system did not crash? Our Tasks completed and when we retrieved their result inthe callback an unhandled exception was raised. Usually this would have caused the Python interpreter to stop but it simply continued on attempting to fetch posts and comments a second time.
This is an important point, when using ensure_future, exceptions will not crash the system and might go unnoticed. So you need to ensure you’re handling exceptions and logging them appropriately.
Let’s change our periodic coroutine to handle this exception, remember since we’re using theFuture.result()method to retrieve the result of the Task it will raise the exception if that was its result:
[16:21:03] Calculating comments for top 5 stories. (1)
[16:21:03] Waiting for5 seconds…
[16:21:04] Something went BOOM
Traceback (most recent call last):
File “01b_error_handling.py”, line128, in callback
fetch_count = fut.result()
File “01b_error_handling.py”, line104, in get_comments_of_top_stories
results = await asyncio.gather(*tasks)
File “01b_error_handling.py”, line71, in post_number_of_comments
response = await fetcher.fetch(session, url)
File “01b_error_handling.py”, line60, infetchraise BoomException(‘BOOM!’)
BoomException: BOOM!
[16:21:08] Calculating comments for top 5 stories. (2)
[16:21:08] Waiting for5 seconds…
[16:21:08] Something went BOOM
Traceback (most recent call last):
File “01b_error_handling.py”, line128, in callback
fetch_count = fut.result()
File “01b_error_handling.py”, line104, in get_comments_of_top_stories
results = await asyncio.gather(*tasks)
File “01b_error_handling.py”, line71, in post_number_of_comments
response = await fetcher.fetch(session, url)
File “01b_error_handling.py”, line60, infetchraise BoomException(‘BOOM!’)
BoomException: BOOM!
This has not completely solved the issue as the system will still not crash, but at least the exception is being logged appropriately.
However, as good practice dictates, we’re catching a specific type of exceptions. What if some other type of exception occurs? Let’s simulate this with as small change in URLFetcher:
[16:23:22] Calculating comments for top 5 stories. (1)
[16:23:22] Waiting for5 seconds…
[16:23:22] Exceptionin callback poll_top_stories_for_comments.<locals>.callback(<Task finishe… exception’,)>) at 01c_error_handling.py:129
handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe… exception’,)>) at 01c_error_handling.py:129>
Traceback (most recent call last):
File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line126, in _run
self._callback(*self._args)
File “01c_error_handling.py”, line131, in callback
fetch_count = fut.result()
File “01c_error_handling.py”, line107, in get_comments_of_top_stories
results = await asyncio.gather(*tasks)
File “01c_error_handling.py”, line74, in post_number_of_comments
response = await fetcher.fetch(session, url)
File “01c_error_handling.py”, line63, infetchraiseException(‘Random generic exception’)
Exception: Random generic exception
Again producing a silent unlogged error in our system.
The key points here are:
when using awaithandle exceptions with try..except as usual.
when using ensure_futureremember to catch generic exceptions and act accordingly.
Here’s a complete listing of the system handling both cases:
Ok, so given that we’ve been hitting errors from the HN API we should probably stop the system, otherwise it’s just going to keep raising error after error.
We can do it the hard way and add loop.stop() to our handling of errors in the callback code:
[16:34:00] Calculating comments for top 5 stories. (1)
[16:34:00] Waiting for5 seconds...
[16:34:01] Error retrieving post 15052691: BOOM!
[16:34:01] Error retrieving comments for top stories: BOOM!
Traceback (most recent call last):
File "02b_error_handling.py", line 175, in <module>
loop, session, args.period, args.limit))
File "/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/base_events.py", line 464, in run_until_complete
raise RuntimeError('Event loop stopped before Future completed.')RuntimeError:Eventloop stopped before Future completed.
Well, that stopped it all right, but asyncio did not like it, we’re forcibly closing the loop while other Futures are still pending, not cool.
In order to exit cleanly we need to return from the periodic coroutine which the loop will detect as complete and close itself.
However the exception is handled inside a callback, but since the callback is triggered independently we need some way of reading the error from the main loop and act accordingly.
We can do that by defining a list of errors in the enclosing scope and mutating it in the callback so we can exit in the next iteration if there are any:
[16:48:59] Calculating comments for top 5 stories. (1)
[16:48:59] Waiting for5 seconds...
[16:48:59] Error retrieving top stories: Random generic exception
[16:48:59] Unexpected error
Traceback (most recent call last):
File"02c_error_handling.py", line153, in callback
fetch_count = fut.result()
File"02c_error_handling.py", line112, in get_comments_of_top_stories
response = await fetcher.fetch(session, TOP_STORIES_URL)
File"02c_error_handling.py", line64, in fetch
raise Exception('Randomgeneric exception')
Exception: Random generic exception
[16:49:04] Error detected, quitting
I know, not pretty, and notice the timestamp as well, the return happened after the sleep which is not ideal but at least we get a clean exit from the loop.
Remember this happens only when using ensure_future. You can always go back to using awaitas described my first article if this is unacceptable.
To raise or not to raise
Let’s step back for a moment though. In our example we’re being quite extreme and raising errors after just a few fetches. But if we were to increase that number when the first exception occurs we may have actually calculated some of them, they are, after all, separate tasks. Is there a way to retrieve these possibly completed tasks?
Why, yes, there is! More than one in fact, but let’s put a pin on that and check the gather docs, notice there’s a return_exceptions argument that default to False:
If return_exceptions is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.
Note that this means gatherwill not raise an exception anymore, so we don’t need the try..except clause in our coroutine. We do, however, need to manually check the list of results to see if there were any errors.
In the following example I increased the number of allowed fetches to allow for some results to come back.
[17:04:43] Calculating comments for top 5 stories. (1)
[17:04:43] Waiting for 5 seconds...
[17:04:47] Error retrieving comments for top stories: BOOM!
[17:04:47] Error retrieving comments for top stories: BOOM!
[17:04:47] Post15051645 has 75 comments (1)
[17:04:47] Error retrieving comments for top stories: BOOM!
[17:04:47] Post15052192 has 13 comments (1)
[17:04:47] > Calculating comments took 3.93 seconds and 511 fetches
[17:04:48] Calculating comments for top 5 stories. (2)
[17:04:48] Waiting for 5 seconds...
Look at that! We managed results for 2 out of the 5 top stories!
This particular example might require you to change the MAXIMUM_FETCHES depending on how popular the top stories in HN are at the point you’re running the script, I suggest increasing it to a high number allowing all tasks to complete and then lowering it to just below the fetch counter.
Let’s come back to that pin we put on the different ways to retrieve completed tasks:
Gather vs wait (vs as_completed)
In the asyncio API there are two main functions for scheduling a set tasks at the same time, our familiar gather and wait. The main differences between them are:
wait returns a tuple of two sets of Task objects, done and pending, whilegather returns the results of those tasks.
gather returns the results in order, i.e. the first element of the returned list is the result of the Task object passed as a first parameter to it. In contrast, wait returns the objects out of order we need to manually keep track of which result corresponds to which Task.
gather, by default, will return when an exception is raised by any of the tasks, or whenever all tasks are done if no exception is raised. As we’ve seen, if return_exceptions is True it will return when all tasks are “done” even if some raised an exception. Conversely wait has a specific parameter return_when that can be one of FIRST_COMPLETED, FIRST_EXCEPTION or ALL_COMPLETEDallowing us finer control on when it returns.
Finally, wait includes a timeout argument, gather does not have it but it’s possible to combine it with wait_for to mimic that behaviour.
Additionally, asyncio includes as_completed which returns an iterator of futures you can await as they are done. I cover both wait and as_completed in my other article Asyncio for the Working Python Developer if you’re interested.
Why do I mention all this? Because we’re going to be needing it very soon.
Cancelling tasks
It may not look like it, but we’re sending potentially thousands of requests to the HN API, more if we were to increase the number of top stories to get comments for, even more if any of the top stories happen to be controversial. We’re basically DDoS-ing the poor thing. Up until now it’s been fine with it but if we abuse it we may be getting some 429 Too Many Requests or 420 Enhance your calm.
So, ideally, as an exception is raised we should avoid making things worse for ourselves and cancel any scheduled tasks. Yeah, you heard me: you can cancel the tasks!
… but you need a handle on them, ideally we need handles as soon as an exception is raised. Now, as we mentioned before, gather returns the results of the tasks, but wait returns the Task objects in two tuples, done and pending.
Let’s give waita go:
[17:20:04] Calculating comments for top 5 stories. (1)
[17:20:04] Waiting for 5 seconds...
Post ??? has 13 comments (1)
Error retrieving comments for top stories: BOOM!
[17:20:07] > Calculating comments took 3.79 seconds and 501 fetches
[17:20:09] Calculating comments for top 5 stories. (2)
[17:20:09] Waiting for 5 seconds...
Post ??? has 13 comments (2)
Post ??? has 82 comments (2)
Error retrieving comments for top stories: BOOM!
[17:20:13] > Calculating comments took 4.05 seconds and 501 fetches
[17:20:14] Calculating comments for top 5 stories. (3)
A few things to note on this example. First off the signature for wait is different, it accepts a list of Task objects so we don’t need to unpack the list as we were doing before with gather.
Secondly, we’re passing return_when=FIRST_EXCEPTION, specifically requesting it to return the two sets as soon as there’s an exception, at that point there will be pending tasks so we cancel them. If none are raised the second set will simply be empty. That’s why we managed to stop immediately after hitting our upper limit for fetches (500 in this example).
However, remember done != successful, if an exception was raised then one and only oneTask object’s result is an exception, and calling result on it will raise it, so we need to be prepared to catch it.
Now for the caveats, notice the ??? in the post numbers? that’s because up until now we’ve been relying on gather returning the results in order, but we’ve lost that using wait.
What we need to do is keep track of which Task object corresponds to each post ID. Up until now we’ve been allowing gather and wait to create these Task objects from coroutine objects, but we can actually create the Task objects ourselves and pass the list to wait.
[17:35:33] Calculating comments for top 5 stories. (1)
[17:35:33] Waiting for 5 seconds...
Error retrieving comments for top stories: BOOM!
Post 15051645 has 86 comments (1)
Post 15052192 has 16 comments (1)
[17:35:37] > Calculating comments took 4.06 seconds and 521 fetches
As you can see we can now get the post ID corresponding to our completed Task object from the set returned by wait.
Notice how we’re creating Task objects using ensure_future (we could’ve also used loop.create_task) and then combining them using wait. Keeping references to our Task objects can be quite handy.
Handling cancellation
Since we’re talking about cancelling tasks, the actual workflow is quite interesting. When task.cancel() is called a CancelledError exception is sent to the coroutine. The coroutine can actually catch that exception and act accordingly, it may even choose to ignore it.
In our case we wouldn’t want to ignore the cancellation entirely, but it might be useful to catch the exception to cancel all child tasks if we have any.
[18:29:54] Calculating comments for top 5 stories. (1)
[18:29:54] Waiting for 5 seconds…
Post 13851386 has 1 comments (1)
Post 13851349 has 2 comments (1)
Error retrieving comments for top stories: BOOM!
[18:29:55] > Calculating comments took 1.76 seconds and 151 fetches
[18:29:55] Comments for post 13851706 cancelled, cancelling 3 child tasks
[18:29:55] Comments for post 13852103 cancelled, cancelling 1 child tasks
[18:29:55] Comments for post 13851611 cancelled, cancelling 2 child tasks
[... more messages like the above ...]
[18:29:59] Calculating comments for top 5 stories. (2)
[18:29:59] Waiting for 5 seconds…
The lesson here is that if there’s a chance that your coroutines can be cancelled remember you can catch the CancelledError and do any clean up that’s necessary. This also highlights the usefulness of storing references to any Tasks you schedule.
I find the solution quite elegant and Pythonic, exceptions are at the core of the language and still are even on a complicated feature like cancelling asynchronous tasks.
Conclusion
And that’s all I have on asyncio, I hope these two articles have satisfied your hunger for coroutine knowledge and you’ve learned a few ideas to keep in mind while working with asyncio.
As you can see there’s quite a bit of housekeeping to perform, especially in situations where you want or need to deviate from async/await and start using different patterns. Asyncio has been subject of detailedanalysis from very clever people and sparked thirdparty libraries that try a different approaches or aim to paliate our suffering.
Personally I think it is not without its flaws, but it’s had a tremendous impact in the community to embrace asynchronous programming. It’s really in its infancy and the core team is hard at work adding new features and polishing the API while the ecosystem around it has flourished and it’s only getting better and better.
I believe asyncio has, without a doubt, pushed Python to the next level.