Asynchronous Python

What is Asynchronous Programming?

“Asynchrony, in computer programming, refers to the occurrence of events independent of the main program flow and ways to deal with such events.

These may be “outside” events such as the arrival of signals, or actions instigated by a program that take place concurrently with program execution, without the program blocking to wait for results.” - Wikipedia

One can also think that in a way asynchronous programming is analogous to the programmer giving up control of execution to the computer.

Why care?

We want our program to run faster and spend less time hanging.

It is important to note that not all subroutines can benefit or even be treated concurrently. It is also important to note that these kinds of implementations will be more difficult than serial. Sometimes task A has to be done before task B.

Asynchronous programs are good for scalability as you are usually interacting with many servers which you will need to wait on, this lends itself perfectly.

Scenarios

With Python there are really two main reasons for using asynchronous programming (I have included relevant packages for these use cases).

  1. Doing more at once, using asyncio or threads.
  2. Doing things faster, using multiprocessing or C/Cython.

To these easier one can also use trio or unsync.

Why don’t threads add computational speed?

Because of a memory management feature called the GIL (Global Interpreter Lock).

But this is not very important for this topic.

Doing More at Once

Using asyncio we are going to make many groups of tasks which have to run in serial.

Code

Generators and their Similarity to Asynchronous Programming

Classic Fibonnacci CS101 function:


def fib(n: int) -> List[int]:
    numbers = []
    current, nxt = 0, 1
    while len(numbers) < n:
        current, nxt = nxt, current + nxt
        numbers.append(current)

    return numbers

result = fib(50) # List of Fib numbers

for n in result:
    print(n, end=', ')
    if n > 10000:
        break

Same Method but now as a generator, where are breaking our work up into chunks which are quantified by the yield keyword. Similar to asynchronous programming, this generator is called multipel times for the next job.


def fib(n: int):
    current, nxt = 0, 1
    while len(numbers) < n:
        current, nxt = nxt, current + nxt
        yield current

result = fib() # Instantiating the generator

for n in result:
    print(n, end=', ')
    if n > 10000:
        break

Anatomy of an async method

import asyncio

# Begin by making the method async.
async def process_data(num: int, data: asyncio.Queue):
    processed = 0

    while processed < num:
        # Await all async methods you call.
        item = await data.get()

        # PROCESS DATA

Real Life Example

We are going to go to a random website used for getting information about podcast episodes. Once

Synchronous Version


import requests
import bs4
from colorama import Fore


def main():
    get_title_range()
    print("Done.")


def get_html(episode_number: int) -> str:
    # Get website HTML
    print(Fore.YELLOW + f"Getting HTML for episode {episode_number}", flush=True)

    url = f'https://talkpython.fm/{episode_number}'
    resp = requests.get(url)
    resp.raise_for_status()

    return resp.text


def get_title(html: str, episode_number: int) -> str:
    # Get podcast title from HTML
    print(Fore.CYAN + f"Getting TITLE for episode {episode_number}", flush=True)
    soup = bs4.BeautifulSoup(html, 'html.parser')
    header = soup.select_one('h1')
    if not header:
        return "MISSING"

    return header.text.strip()


def get_title_range():
    # Please keep this range pretty small to not DDoS my site. ;)

    # Range of episodes to get, n
    for n in range(185, 200):
        html = get_html(n)
        title = get_title(html, n)
        print(Fore.WHITE + f"Title found: {title}", flush=True)


if __name__ == '__main__':
    main()

Asynchronous Version

import asyncio
from asyncio import AbstractEventLoop

import aiohttp
import requests
import bs4
from colorama import Fore


def main():
    # Create loop
    loop = asyncio.get_event_loop()

    # Run Event loop
    loop.run_until_complete(get_title_range(loop))

    print("Done.")


async def get_html(episode_number: int) -> str:
    print(Fore.YELLOW + f"Getting HTML for episode {episode_number}", flush=True)

    # Make this async with aiohttp's ClientSession
    url = f'https://talkpython.fm/{episode_number}'
    # resp = await requests.get(url)
    # resp.raise_for_status()

    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            resp.raise_for_status()

            html = await resp.text()
            return html


def get_title(html: str, episode_number: int) -> str:
    print(Fore.CYAN + f"Getting TITLE for episode {episode_number}", flush=True)
    soup = bs4.BeautifulSoup(html, 'html.parser')
    header = soup.select_one('h1')
    if not header:
        return "MISSING"

    return header.text.strip()


async def get_title_range(loop: AbstractEventLoop):
    # Please keep this range pretty small to not DDoS my site. ;)
    tasks = []
    for n in range(190, 200):
        tasks.append((loop.create_task(get_html(n)), n))

    for task, n in tasks:
        html = await task
        title = get_title(html, n)
        print(Fore.WHITE + f"Title found: {title}", flush=True)


if __name__ == '__main__':
    main()

Some Common Questions

Does speed change on different Operating Systems?

Test would have to be run to test this but more interestingly instead of using the normal asyncio event loop, one can use the UV Loop.

The UV Loop can be significantly faster than the standard asyncio event loop, but, according to a the Jetbrains lecture used to make these notes, it does not work on Windows.

Asynio vs Threading in terms of learning curve?

Threads is harder by about 3 times.

Book Recomedations

“Fluent Python” has a good section, although out of date (Python 3.7 brought a lot of updates)

Can asynio and multiprocessing be used together?

Yes they are for different use cases. There is a package called multiprocessingio which does this.

What is a coroutine?

Similar to a generator function where you run up to a point and then execute only parts of the program. Returning interim values.

But look it up for more details.

When things are not async compatible?

Short answer: Can try using threads.

Example: Making Faster API Calls

By using Asynchronous Programming, we are going to be able to increase the speed by which our requests are going to be executed by using the concept of an “event loop”. An event loop is a kind of program which will initialise a function call or subroutine and not wait until the subroutine is finished. Instead the event loop will launch the next subroutine, and the one after that. This is until it has no more subroutines to initialise to then come back to check if the subroutines previously initialised have given a return.

By doing this we reduce the amount of time left just waiting for the subroutine to finish.

Example Program:

Making faster api calls for financial data (taken from: [[https://www.youtube.com/watch?v=nFn4_nA_yk8][This YT Video]])


import asyncio
import aiohttp
import os
import time

# Important to note that requests is a synchronous
# library and cannot work asynchronous.

api_key = os.getenv('ALPHAVANTAGE_API_KEY')
url = 'https://www.alphavantage.co/query?function=OVERVIEW&symbol={}&apikey={}'  
tickers = ['AAPL', 'GOOG', 'TSLA', 'MSFT', 'AAPL'] 
results = []

# Timing!
start = time.time()

# async keyword tells python that this
# method can do asynchronous stuff. 
async def get_tickers_v1():
    """
    Request data from api using requests
    """
    # Context Manager for running an asynchronous
    # session.
    async with aiohttp.ClientSession() as session:
        for ticker in tickers:

            # Make API requests synchronously
            # response = requests.get(url.format(ticker, api_key))
            # results.append(response.json())

            # Make API requests asynchronously
            # await - Tells Python this method is to be treated
            # asynchronously.
            response = await session.get(
                url.format(ticker, api_key)
            )

            results.append(await response.json())


async def get_tickers_v2():
    def get_tasks(session)
        tasks = []
        for ticker in tickers:
            tasks.append(
                # asyncio.create_task puts the subroutine
                # on the event loop
                asyncio.create(
                    session.get(
                        url.format(ticker, api_key), ssl=False
        )))

    async with aiohttp.ClientSession() as session:
        tasks = get_tasks(session)
        # asyncio.gather puts or takes subroutine on or
        # off of the event loop, depending on if they
        # have been initialized.
        responses = await asyncio.gather(*tasks)
        for response in responses:
            results.append(await response.json())


# Event loop - Long Version
loop = asyncio.get_event_loop()
loop.run_until_complete(get_tickers())
loop.close()


# Event loop - Short Version
asyncio.run(get_tickers())

# Timing!
end = time.time()
total_time = end - start
print(f"That took {total_time} seconds")

References

[[https://www.youtube.com/watch?v=F19R_M4Nay4&t=3s&ab_channel=JetBrains][Demystifying Python’s Async and Await Keywords, by JetBrains Youtube]]