asyncio
1. Introduction to coroutine
Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing execution to be suspended and resumed.This means that unlike Threads which can be preempted from OS, coroutines are not preempted from outside but give up control themselves. This avoids any requirment of synchronisation primitives such as lock or semaphore.
2. The big picture
Consider an event loop with 2 queue, ready and waiting. We create a new task/coroutine to perform an IO
operation or configure remote device and put it in ready queue. Initially execution control is with event
loop which it passes to the new task in ready queue. Task starts execution. Whenever task need to wait
from results, it can move to waiting queue and give control back to event loop. Event loop then gives the
control to next task in ready queue. Meanwhile when task get the result, it is moved from waiting to ready
queue.
3. Async and await syntax
3.1 starting co-routines sequentially
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main(): # refer your function as a coroutine
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello') # mark your statement which can go for IO bound work
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
# Python 3.7+
asyncio.run(main()) # this gets the event loop and runs the coroutine
asyncio.run function runs the passed coroutine, taking care of managing the asyncio event loop.
3.2 starting co-routines concurrently
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
await asyncio.gather(task1, task2)
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
asyncio.gather run the task asynchronously
4. Example
In this example, we will try to fetch mutiple http requests in a interleaving fashion. Suppose you have to create a crawler which get crawled data for a given URL(called baseurl in code) and the url found in baseurl html code recursively till we reach a depth. Eventually we write content of url in a files.
:
def get_content(link):
page = httpx.get(link)
return page.text
def url_logic(url,i,depth, destination):
urls = []
if i == depth or not url.startswith('http'):
return
## get the html page
data = get_content(url)
## find out the links in the page and add it to a list
find_links(data, urls) if depth > 0 else sys.exit(1)
## print in the file (url,depth,content)
global COUNT
COUNT = write_content(url, str(depth), data, destination, COUNT)
for u in urls:
url_logic(u, i+1, depth, destination)
$ time python3.8 crawler.py http://ankit-rana.github.io/blog/2020/01/11/asyncio ./data/ 3
real 0m29.915s
user 0m6.503s
sys 0m0.185s
By adding user and sys, you get a sense of how much time was spent in a cpu. The difference between this and real might tell you about the amount of time spent waiting for IO.
:
async def read_content(link):
# asyncio will not really work if you are not using library which supports async
async with httpx.AsyncClient() as client:
result = await client.get(link)
return result.text
async def url_logic(url,i,depth, destination):
urls = []
if i == depth:
return
if not url.startswith('http'):
return
## get the html page
print("%s step1"%url)
data = await get_content(url)
## find out the links in the page and add it to a list
print("%s step2"%url)
find_links(data, urls) if depth > 0 else sys.exit(1)
## print in the file (url,depth,content)
global COUNT
print("%s step3"%url)
COUNT = write_content(url, str(depth), data, destination, COUNT)
tasks = []
for u in urls:
tasks.append(asyncio.create_task(url_logic(u, i+1, depth, destination)))
await asyncio.gather(*tasks)
$ time python3.8 crawler.py http://ankit-rana.github.io/blog/2020/01/11/asyncio ./data/ 3
real 0m7.380s
user 0m6.494s
sys 0m0.179s
5. Exception Handling
In Above example, I found that with increase in number of depth, number of links increased and some error messages started appearing
Executing <Task cancelled name='Task-14759' coro=<url_logic() done, defined at crawler.py:8> created at /usr/lib64/python3.8/asyncio/tasks.py:382> took 0.102 seconds
After debugging, I found that my logic to bring html page content “read_content” was raising exception
async def read_content(link):
# asyncio will not really work if you are not using library which supports async
async with httpx.AsyncClient() as client:
result = await client.get(link)
return result.text
So Added exception handling like below:
async def read_content(link):
# asyncio will not really work if you are not using library which supports async
try:
client = httpx.AsyncClient()
result = await client.get(link)
except Exception:
return ''
finally:
await client.aclose()
return result.text
You might want to handle each coroutine return value in main program too. For that, we use return value of await asyncio.gather(*tasks).