Asyncio

11 Jan 2020

asyncio

1. Introduction to coroutine

Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing execution to be suspended and resumed.This means that unlike Threads which can be preempted from OS, coroutines are not preempted from outside but give up control themselves. This avoids any requirment of synchronisation primitives such as lock or semaphore.

2. The big picture


Consider an event loop with 2 queue, ready and waiting. We create a new task/coroutine to perform an IO operation or configure remote device and put it in ready queue. Initially execution control is with event loop which it passes to the new task in ready queue. Task starts execution. Whenever task need to wait from results, it can move to waiting queue and give control back to event loop. Event loop then gives the control to next task in ready queue. Meanwhile when task get the result, it is moved from waiting to ready queue.

3. Async and await syntax

3.1 starting co-routines sequentially

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():                    # refer your function as a coroutine
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')      # mark your statement which can go for IO bound work
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")


# Python 3.7+
asyncio.run(main())                  # this gets the event loop and runs the coroutine

asyncio.run function runs the passed coroutine, taking care of managing the asyncio event loop.

3.2 starting co-routines concurrently

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    
async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")
    await asyncio.gather(task1, task2)   
    print(f"finished at {time.strftime('%X')}")
    
asyncio.run(main())

asyncio.gather run the task asynchronously

4. Example

In this example, we will try to fetch mutiple http requests in a interleaving fashion. Suppose you have to create a crawler which get crawled data for a given URL(called baseurl in code) and the url found in baseurl html code recursively till we reach a depth. Eventually we write content of url in a files.

Synchronous code:

def get_content(link):
    page = httpx.get(link)
    return page.text
    
def url_logic(url,i,depth, destination):
    urls = []
    if i == depth or not url.startswith('http'):
        return
    ## get the html page
    data = get_content(url)

    ## find out the links in the page and add it to a list
    find_links(data, urls) if depth > 0 else sys.exit(1)

    ## print in the file (url,depth,content)
    global COUNT
    COUNT = write_content(url, str(depth), data, destination, COUNT)
   
    for u in urls:
        url_logic(u, i+1, depth, destination)



$ time python3.8 crawler.py http://ankit-rana.github.io/blog/2020/01/11/asyncio ./data/ 3

real	0m29.915s
user	0m6.503s
sys	    0m0.185s


By adding user and sys, you get a sense of how much time was spent in a cpu. The difference between this and real might tell you about the amount of time spent waiting for IO.

Asynchronous code:

async def read_content(link):
    # asyncio will not really work if you are not using library which supports async
    async with httpx.AsyncClient() as client:
        result = await client.get(link)
    return result.text
    
async def url_logic(url,i,depth, destination):
    urls = []
    if i == depth:
        return
    if not url.startswith('http'):
        return
    ## get the html page
    print("%s step1"%url)
    data = await get_content(url)

    ## find out the links in the page and add it to a list
    print("%s step2"%url)
    find_links(data, urls) if depth > 0 else sys.exit(1)

    ## print in the file (url,depth,content)
    global COUNT
    print("%s step3"%url)
    COUNT = write_content(url, str(depth), data, destination, COUNT)

    tasks = []
    for u in urls:
        tasks.append(asyncio.create_task(url_logic(u, i+1, depth, destination)))

    await asyncio.gather(*tasks)



$ time python3.8 crawler.py http://ankit-rana.github.io/blog/2020/01/11/asyncio ./data/ 3
real	0m7.380s
user	0m6.494s
sys	0m0.179s

5. Exception Handling

In Above example, I found that with increase in number of depth, number of links increased and some error messages started appearing

Executing <Task cancelled name='Task-14759' coro=<url_logic() done, defined at crawler.py:8> created at /usr/lib64/python3.8/asyncio/tasks.py:382> took 0.102 seconds


After debugging, I found that my logic to bring html page content “read_content” was raising exception

async def read_content(link):
    # asyncio will not really work if you are not using library which supports async
    async with httpx.AsyncClient() as client:
        result = await client.get(link)
    return result.text


So Added exception handling like below:

async def read_content(link):
    # asyncio will not really work if you are not using library which supports async
    try:
        client = httpx.AsyncClient()
        result = await client.get(link)
    except Exception:
        return ''
    finally:
        await client.aclose() 
    return result.text

You might want to handle each coroutine return value in main program too. For that, we use return value of await asyncio.gather(*tasks).

6. Appendix

6.1 Python Official Doc.

6.2 Michael Kennedy course on Asynchronous programming in Python

6.3 Example’s Asyncio Code

6.4 Video by Lukasz Langa