Understanding the magic of async/await in Python

Vardan Aloyan
20 min readSep 30, 2023

--

Concurrent python applications

Audience

For a complete understanding of this article, it requires an intermediate-to-high level of knowledge in the Python programming language.

Motivation

The idea of writing this article came into my mind when I spent some hours searching on the internet about the evolution of concurrent programming in Python, i.e., the comparison between the yield from expression and async/await keywords or what is happening in the event loop under the hood, and I couldn’t find good articles explaining all of these questions with real-life examples. There were some articles that were trying to explain by relying on primitive examples (mainly relying on time.sleep or asyncio.sleep), but these examples were not enough for deep understanding. So after spending several days reading some PEP proposals and taking some highlights from various articles and books, I got my main questions answered. After all of this research, I want to share with the community what I’ve learned so far, so I decided to write this article.

Understanding the concepts like Parallelism and Concurrency

First of all, I find it useful to briefly go through “similar” concepts like parallelism and concurrency.

Let’s start with parallelism.

Parallelism is the ability of a program to run several tasks at the same time.

To achieve parallelism in Python, a multiprocessing library can be used.

The following diagram is an illustration of a Python application that uses three processes for some computational tasks.

Multiprocessing in Python

As is also visible from the diagram, each sub-process is running a separate Python interpreter with its own isolated memory. In order to share data between processes, it requires some additional effort (Pipe, Queue, etc…).

In this case, the operating system (OS) is responsible for process management. OS decides which process needs to be active at the point of time and which one to be idle. In other words, the OS is responsible for context switching.

In the multithreadingworld, the picture is a bit different.

Multi-threading in Python is considered a way of achieving concurrency and not parallelism because of its GIL (Global Interpreter Lock) nature.

The diagram below illustrates a multi-threaded application in Python.

Multithreading in Python

Threads in Python run inside the single core and inside the same process where the main Python process is; therefore, they share the same memory. Contrary to multiprocessing, here you have direct access to the objects across the threads.

In regards to context switching, the Python interpreter decides which thread should be active and which not.

Python threads belong to the concurrency concept because only one thread can be active at a single point in time.

According to the definition, in computer science, concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in partial order without affecting the outcome.

As we saw above, multiprocessing for context switching was responsible for the OS itself; in multiprocessing, it was the Python interpreter. What about being responsible ourselves (software) for context switching? And here is where asynchronous programming comes in, which is another way of achieving concurrency.

Before going further, I would like to draw your attention to the diagram below, which I found on the internet (didn’t save the author, sorry), which represents the comparison of the above-mentioned techniques. I would leave it without a comment.

Comparison of multiprocessing vs multithreading vs asynchronous programming

To understand the magic of async/await, first we should understand some base concepts that exist in Python.

Generators

Generators came into Python with PEP-255. A generator in Python is a function that returns an iterator using the yieldkeyword. It also supports resuming execution from where it left off when iterating over it.

Here’s a small example of a generator function demonstrating the above-mentioned behavior.

def gprovider(n=10):
for i in range(n):
yield i

gen = gprovider()
print(gen) # <generator object gprovider at 0x1050c0040>
print(next(gen)) # 0
print(next(gen)) # 1
print(list(gen)) # [2, 3, 4, 5, 6, 7, 8, 9]

So every time we call next, it resumes the execution of the generator until the point when it executes the yield statement, then the execution is paused (the generator is suspended) and the control is passed to the caller (in this case, the caller is the main program).

We can follow the changes in the current state of the generator object using built in inspect module.

>>> from inspect import getgeneratorstate
>>> gen = gprovider()
>>> getgeneratorstate(gen)
'GEN_CREATED'
>>> next(gen)
0
>>> getgeneratorstate(gen)
'GEN_SUSPENDED'
>>> list(gen)
[1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> getgeneratorstate(gen)
'GEN_CLOSED'

It is possible 4 different states for generators*

  1. 'GEN_CREATED' → Waiting to start execution
  2. 'GEN_RUNNING' → Currently being executed by the interpreter (it’s possible to see this state in multi-threaded application)
  3. 'GEN_SUSPENDED' → Currently suspended at a yield expression
  4. 'GEN_CLOSED' → Execution has completed

With PEP-380, Python got a new language construct: yield from which is a syntax for delegating to a subgenerator. A syntax is proposed for a generator to delegate part of its operations to another generator.

Although it does a lot more than yield, as we will see in this article, one of its features is that it enables complex generators to be refactored into smaller ones. Using yield fromsyntax, we could “feed” our generator from sub-generators and from any kind of iterable.

Just a small example to illustrate it.

def gprovider(n=10):
for i in range(n):
yield i

def user_provider(n):
gen = gprovider(n)
yield from gen

>>> gen = user_provider(10)
>>> next(gen)
0
>>> list(gen)
[1, 2, 3, 4, 5, 6, 7, 8, 9]

yield from can be used to flatten different kinds of data structures.

def gen():
yield from 'AB'
yield from range(1, 3)

>>> g = gen()
>>> next(g)
'A'
>>> for i in g:
... print(i)
...
B
1
2

Coroutines

In Python, coroutines are very similar to generators, with some differences in how they use yield statements.

Generators are primarily designed for data production during iteration, whereas coroutines possess the dual capacity for data production and consumption; in other words, coroutines are functions (generators) that can be stopped or resumed at specific points and can both produce and receive values at these points.

Below, you can see what a simple coroutine looks like.

def simple_coro(n=10):
print("Coro started")
for sent in range(n):
recieved = yield sent
print("Recieved: ", recieved)
print("Coro ended")

>>> coro = simple_coro(). # Initiating coroutine
>>> coro
<generator object simple_coro at 0x100dc2e30>
>>> next(coro) # Priming or advancing the coroutine, the same as coro.send(None)
Coro started
0
>>> coro.send("Hello")
Recieved: Hello
1
>>> yielded_value = coro.send("World!")
Recieved: World!
>>> yielded_value
2
>>> next(coro)
Recieved: None
3
>>>

In order to make Coroutine ready for consuming data, we should first prime it. In other words, we should bring the interpreter to the place where it yields its first value and waits for data to be consumed.

Let’s understand step-by-step what is happening in the above example using the following picture:

Explanation of coroutine execution

[1] point corresponds to the point when we initiated our coroutine object.

[2] point corresponds to the coroutine priming, i.e., the first time hitting the coroutine object. Note that the coroutine could only be primed either by calling next or sending the None value. Any other value sent before priming the coroutine will cause an exception.

>>> coro = simple_coro()
>>> coro.send("Coro is not primed yet")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: can't send non-None value to a just-started generator
>>>

from 2 to n corresponds to the remaining invocation of the coroutine until iteration reaches its end, and further access to the coroutine will raise a StopIteration exception (I let you try this on your own).

Now let’s go back to the yield from but now from the point of view of the coroutine.

I suggest you go over the example of the averager, which is a really great example to understand yield from.

For now, let’s define several terms:

Delegating generator is the generator function that contains the yield from expression.

The subgenerator is the generator used inside the yield from expression (used by the delegating generator).

The caller is the code or function that calls the delegating generator.

One of the features of generators that came with PEP-380 is that generators can return values, and those values will be attached to the StopIteration Exception’s value and will automatically become the return value of the yield from expression.

It seems a bit of a “hack” and hard to understand immediately, but this is how it’s implemented in Python.

Below, I’ll try to illustrate it with examples.

First, let’s see how the return value is getting attached as an exception value.

def sum_coro():
_sum = 0
while True:
recieved = yield
if recieved == 'END':
break
_sum += recieved
return _sum

>>> adder = sum_coro()
>>> next(adder)
>>> adder.send(10)
>>> adder.send(1)
>>> adder.send(1)
>>> adder.send(1)
>>> adder.send("END")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration: 13

In the example above we see 13 is the exception value.

To understand it better, take a look at this example:

>>> adder = sum_coro()
>>> next(adder)
>>> adder.send(1)
>>> adder.send(2)
>>> adder.send(3)
>>> try:
... adder.send("END")
... except StopIteration as ex:
... print(ex.value)
...
6
>>>

Now let’s try to use sum_coro as a “subgenerator” and use it from “delegating generator” by calling it from “caller”, which would be our main function.

Our delegating coroutine looks like this:

def delegating_coro():
SUM = yield from sum_coro()
print("Return value of yield from expression is", SUM)
yield SUM

The caller looks like this:

def main():
data = [1, 2, 3, 4, 5, 6, 7, 8]
gen = delegating_coro()
next(gen) # Advance to the yield
for i in data:
gen.send(i)
result = gen.send('END') # This will break the loop and cause a StopIteration exception to be raised inside subgenerator which will be handled by yield from
print(result)

After executing main(), we see the following:

>>> main()
Return value of yield from expression is 36
36
>>>

With this example, we see that the main feature of yield from is to open a bidirectional channel from the outermost caller to the innermost subgenerator so that values can be sent and yielded back and forth directly from them.

The newer await keyword is very similar to yield from.

Different types of coroutines in Python

The coroutines that we have discussed so far are called classic coroutines. Those are coroutines that first appeared in Python.

In Python 3.5, native coroutines are entered; those are coroutines that are defined using async def and consumed by awaiting.

There is a third type of coroutines as well, called generator-based coroutines, which are classic coroutines decorated with @types.coroutine, which makes classic coroutines compatible with native await/await syntax.

Herein, the theoretical part of this article comes to an end, and further will begin the practical part of constructing an asynchronous HTTP client using classic coroutines, relying only on standard libraries.

Implementation of an asynchronous HTTP client

In order to implement an asynchronous HTTP client, we first need to understand how the HTTP protocol works. I will not go deep into that topic, but I will highlight some key points that are required for further understanding of the application.

In the following diagram we see basic illustration of HTTP request and response. Diagram shows the flow from the point of view of the client.

So we can divide the flow illustrated in the diagram into several points:

  1. Connect — it is the first operation that requires In order to interact with the server, we first need to connect with it.
  2. Write — when connection is made and the server (socket) is ready to receive data, we write into it (this part can be called “HTTP Request”).
  3. Read — when server receives our request, it starts to process it, and when it’s ready to provide the data (results) back to the client, we read from it (this part can be called “Receiving HTTP Response”).
Definition in picture

For now, those bullet points above are enough to continue further. For a detailed understanding of the HTTP protocol, there are a bunch of other materials on the internet. Also, I can suggest looking at my github repository, where I implemented an HTTP client and server using the sockets library (in a syncronous way).

So now let’s define the task which we want to solve. Definition of the task is the following:

Write an asynchronous HTTP client that will be able to make asynchronous HTTP Get requests to the REST APIs, which typically provide responses by specifying the Content-Length value in the HTTP Response headers. All limitations (GET method, content length) are put in place in order to simplify the solution, but we also keep the maturity of the example because we will construct a working solution with some easy-to-extend limitations.

Our expectation is to have the following behavior, which we will get at the end of the article.

Results

For demonstration purposes we will use following API

https://httpbin.org/anything

httpbin.org is often used for testing and debugging purposes. It’s a versatile tool for developers to experiment with different HTTP request methods, headers, and other aspects of web communication to better understand and troubleshoot their applications.

Let’s look at the modules that we will use to implement our asynchronous HTTP client.

import json  # Used to parse the response body
import logging # Used to log the output
import selectors # Cross platform networking library. Used to wait for events on sockets. Used in event loop.
import socket # Low level networking library. Used to create sockets.
import urllib.parse # Used to parse the URL, i.e. scheme, netloc, path, etc.
from collections import namedtuple # Used to store the response body and headers

Following are some objects that we initialize in the beginning.

LOG_LEVEL = "DEBUG"  # Log level for the logger
logging.basicConfig(level=LOG_LEVEL, format="%(message)s") # Configure the logger
selector = selectors.DefaultSelector() # Create a selector object

logger = logging.getLogger("ConcurrentApp") # Create a logger object
Response = namedtuple("Response", ["body", "headers"]) # Create a named tuple to store the response body and headers

Now let’s define functions which are needed for realization of our client.

Connect function, which is used to make the connection between client and server.

def connect(sock, url, _uuid):
""" Function to connect to a server

Args:
sock (socket): Socket object
url (SplitResultBytes): URL object, parsed using urllib.parse.urlsplit
_uuid (str): UUID of the task
"""
logger.info("[%s] Connecting to a server", _uuid)
try:
sock.connect((url.netloc, 80))
except BlockingIOError:
pass

I will only draw your attention to the parts that are worth mentioning and will not go line by line.

In the function, we create a connection using sock.connect. We enclose it inside the try..except clause because later we will see that our socket object is configured to be “non-blocking”, and in that case, the socket module raises the BlockingIOErrorexception.

Write function, which is used to write to a server.

def write(url, _uuid):
""" Function to write to a server

Args:
url (SplitResultBytes): URL object, parsed using urllib.parse.urlsplit
_uuid (str): UUID of the task
"""
sock = yield
logger.info("[%s] Writing to a server", _uuid)
http_request = f"GET {url.path} HTTP/1.1\r\nHost: {url.netloc}\r\n\r\n"
sock.sendall(http_request.encode())
selector.modify(sock, selectors.EVENT_READ, _uuid)

The write function is a coroutine function that is visible from the yield statement. It will receive a socket object from the outer (Caller) function (Spoiler: from the event loop).

On the last line, we modify the selector. We will get back to this later in this article.

Read function is used to read the response from the server, parse the headers, and return the response body and headers.

def read(_uuid):
""" Function to read from a server

Args:
_uuid (str): UUID of the task

Returns:
Response (namedtuple): namedtuple containing the response body and headers
"""
headers = b""
body = b""
headers_parsed = False
sock = yield

logger.info("[%s] Reading from a server", _uuid)

while True:
try:
data = sock.recv(512)
except BlockingIOError:
continue

if not data:
selector.unregister(sock)
sock.close()
break

if not headers_parsed:
headers += data

if b"\r\n\r\n" in headers:
headers, rest = headers.split(b"\r\n\r\n", 1)
headers_parsed = True

for line in headers.split(b"\r\n"):
if line.startswith(b"Content-Length:"):
content_length = int(line.split(b":")[1].strip())
break
body += rest
else:
body += data
if content_length is not None and len(body) >= content_length:
break
sock.close()
return Response(json.loads(body), headers.decode())

As we see, the read function has a bit more lines compared to the previous two functions. The reason is that, except for reading from the server, it also includes the parsing of the HTTP response and the extraction of the body and header from the raw response.

The read function, like the write function, is a coroutine function. Again, it will receive a socket object from the event loop. The rest of the code is just parsing the HTTP request and extracting headers and body. At the end, we construct our namedtupleobject and return it. Later, we will see that this return value will be the return value of the yield from expression.

So the main functionality is there, but we are still missing two important functions. One of these, I believe you will also guess, is the event loop. The second function that we still miss is the function that binds the above-defined three functions with each other. Let’s call it http_get .

def http_get(url):
""" Function to fetch a URL. It binds the connect, write and read functions together.

Args:
url (str): URL to fetch

Returns:
Response (namedtuple): namedtuple containing the response body and headers

Yields:
_uuid (str): UUID of the task
"""
_uuid = yield
url_info = urllib.parse.urlsplit(url)
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_sock.setblocking(False)
selector.register(client_sock, selectors.EVENT_WRITE, _uuid)
connect(client_sock, url_info, _uuid)

yield from write(url_info, _uuid)
result = yield from read(_uuid)
return result

As you can guess, this function is also a coroutine. On the first line, it receives _uuid, which is a unique id generated by the event loop function.

Let’s for now skip the selector part (we will get back to it during the event loop explanation).

After the selector line, we use our connect function and initiate a connection.

Then we yield from the write function, which, in its turn, returns the control to the caller (event loop) and waits for a socket object (going in suspended mode). And this is where we take the first steps toward concurrency. Later on, when the socket is available for writing operations inside event_loop, it will be resumed by receiving the socket object.

The next line that is noteworthy is the line where we receive the results. Here we see the beauty of the yield from statement. The namedtuple object that we returned from the read function is now the return value of the yield from expression, which we keep in the result variable.

Without further delay, let’s go to our event loop function.

The event loop function plays a key role in asynchronous programming. It’s responsible for driving (suspending and resuming) the coroutines. It’s also responsible for waiting for the input-output network events in order to resume the corresponding coroutines that were waiting for some operations.

It’s using the selectors standard library to get events for registered sockets, whether they are ready for read or write operations.

The event loop also should have some container data structure to hold all task objects (tasks_dict).

def event_loop(tasks):
""" Function to run the event loop

Args:
tasks (list): List of tasks to run

Returns:
list: List of results from the tasks
"""
tasks_dict = {}
results_dict = {}

logger.debug("[eventloop] Adding %s tasks to eventloop.", len(tasks))

for _index, _task in enumerate(tasks):
_uuid = f"task-{_index}"
_task.send(None) # Prime/Advance the generator
_task.send(_uuid) # Send the UUID to the generator
tasks_dict[_uuid] = _task

try:
while tasks_dict:
logger.debug("[eventloop] Waiting for an event...")
events = selector.select()
logger.debug("[eventloop] Got %d event", len(events))
for key, mask in events:
read_event = mask & selectors.EVENT_READ
event_type = "READ" if read_event else "WRITE"
_uuid = key.data
logger.debug("[eventloop] [%s] New '%s' event", _uuid, event_type)
task = tasks_dict.pop(_uuid)
try:
task.send(key.fileobj)
tasks_dict[_uuid] = task
except StopIteration as e:
results_dict[_uuid] = e.value
logger.info("[eventloop] [%s] was completed", _uuid)
logger.debug("[eventloop] Exiting. All tasks are completed")
finally:
selector.close()
return list(results_dict.values())

Let’s briefly walk through some parts of the code inside the event_loop function and make some explanations there.

tasks_dict = {}
results_dict = {}

logger.debug("[eventloop] Adding %s tasks to eventloop.", len(tasks))

for _index, _task in enumerate(tasks):
_uuid = f"task-{_index}"
_task.send(None) # Prime/Advance the generator
_task.send(_uuid) # Send the UUID to the generator
tasks_dict[_uuid] = _task

We do some preparation at the beginning of the function by looping over the tasks. We generate a unique id for each task, which will be attached to the task and will be the identifier of the task. It will be used by event_loop to retrieve and drive the coroutine.

After the generation of a unique ID, we prime the coroutine (there are well-known decorator approaches for making the priming of the coroutines automated, but for simplicity, we are not using them in our solution).

After priming the coroutine, we send a unique ID to the task, which is waiting for it. You can already probably guess that the http_get function is a so-called task.

Then we store the mapping of unique ids and tasks inside the tasks_dictdictionary.

We reached the important part, which is the while loop.

while tasks_dict:
logger.debug("[eventloop] Waiting for an event...")
events = selector.select()
logger.debug("[eventloop] Got %d event", len(events))
for key, mask in events:
read_event = mask & selectors.EVENT_READ
event_type = "READ" if read_event else "WRITE"
_uuid = key.data
logger.debug("[eventloop] [%s] New '%s' event", _uuid, event_type)
task = tasks_dict.pop(_uuid)
try:
task.send(key.fileobj)
tasks_dict[_uuid] = task
except StopIteration as e:
results_dict[_uuid] = e.value
logger.info("[eventloop] [%s] was completed", _uuid)
logger.debug("[eventloop] Exiting. All tasks are completed")

It runs until there are tasks in tasks_dictdictionary.

Then it waits for the events for the sockets, which were registered inside the http_get and write functions.

So in order selectors could notify us about readiness to make some operations on sockets, we have to first register our sockets. And that was what was happening inside the http_get and write functions.

Inside the http_get function, we register our socket for EVENT_WRITE, which means that the selector will return a value during the selector.select() when a registered socket is ready to receive data.

def http_get(url):
...
selector.register(client_sock, selectors.EVENT_WRITE, _uuid)
...

Inside the write function, we modify the selector that was already registered for WRITE events into READ (EVENT_READ), because when we write to the server, we should wait to read the response from it.

def write(url, _uuid):
...
selector.modify(sock, selectors.EVENT_READ, _uuid)

Whenever we receive an event from the selector, we can parse the socket object and data that were passed during the registration phase.

Socket objects could be retrieved from key.fileobj, and uuids could be retrieved from key.data.

Then, having uuid, we get (pop) corresponding task from the tasks_dict dictionary and drive the coroutine by sending a socket object into it. On the next line, we insert the coroutine back into the dictionary and wait for the next events. If there is a StopIteration exception during task.send(key.fileobj), it means that the coroutine has finished, and we can extract the returned value, which should be attached to the exception value.

                ...
try:
task.send(key.fileobj)
tasks_dict[_uuid] = task
except StopIteration as e:
results_dict[_uuid] = e.value
...

Last but not least, we run our asynchronous HTTP application with the following few lines of code:


tasks = [
http_get("https://httpbin.org/anything"),
http_get("https://httpbin.org/anything"),
]
results = event_loop(tasks)
# for _ind, result in enumerate(results):
# logger.debug("Body %d: %s", _ind, json.dumps(result.body, indent=2))
# logger.debug("Header %d: %s", _ind, result.headers)

Here we create a task list by specifying a URL to fetch, and afterwards pass the list into the event_loop.

Optionally, we can loop over the results and retrieve them.

You can find complete code inside my github repo.

Results

Here are the results of running the application in DEBUG mode.

python3 concurrent_http_calls.py
[eventloop] Adding 2 tasks to eventloop.
[task-0] Connecting to a server
[task-1] Connecting to a server
[eventloop] Waiting for an event...
[eventloop] Got 1 event
[eventloop] [task-0] New 'WRITE' event
[task-0] Writing to a server
[eventloop] Waiting for an event...
[eventloop] Got 1 event
[eventloop] [task-1] New 'WRITE' event
[task-1] Writing to a server
[eventloop] Waiting for an event...
[eventloop] Got 1 event
[eventloop] [task-0] New 'READ' event
[task-0] Reading from a server
[eventloop] [task-0] was completed
[eventloop] Waiting for an event...
[eventloop] Got 1 event
[eventloop] [task-1] New 'READ' event
[task-1] Reading from a server
[eventloop] [task-1] was completed
[eventloop] Exiting. All tasks are completed

I left on you to resolve and understand the DEBUG output above.

For now let’s have a look to the output in INFO mode of logging module.

python3 concurrent_http_calls.py
[task-0] Connecting to a server
[task-1] Connecting to a server
[task-0] Writing to a server
[task-1] Writing to a server
[task-0] Reading from a server
[eventloop] [task-0] was completed
[task-1] Reading from a server
[eventloop] [task-1] was completed

Here we see that we achieved our goal, we wrote concurrent asynchronous HTTP client application.

If we run the program several times, we will see that the order of completion of the tasks is differs from eachother. In the example above, task-0 was scheduled and completed first.

In this case we see different picture:

python3 concurrent_http_calls.py
[task-0] Connecting to a server
[task-1] Connecting to a server
[task-0] Writing to a server
[task-1] Writing to a server
[task-1] Reading from a server
[eventloop] [task-1] was completed
[task-0] Reading from a server
[eventloop] [task-0] was completed

Even though task-0 scheduled the first, task-1 completed first.

I would suggest trying it out yourself and doing some experiments on your own. It’s time to speak about native coroutines.

Native coroutines

A coroutine defined with async def is called native coroutine.

You can delegate from a native coroutine to another native coroutine or to a generator-based coroutine using the await keyword, like how classic coroutines use yield from. It uses the yield from implementation with an extra step of validating its argument.

As the main idea of this article was to understand how actually native coroutines works and what is happening inside the event loop , I will not go too deep into the asyncioand async/await topic iteself , but I can refer you to a PEP-492, which brought native coroutines into the Python language.

Now let’s implement the asynchronous HTTP client using native coroutines and the asyncio library.

import asyncio
import logging
import urllib.parse

LOG_LEVEL = "INFO" # Log level for the logger
logging.basicConfig(level=LOG_LEVEL, format="%(message)s") # Configure the logger
logger = logging.getLogger("AsyncioApp") # Create a logger object

async def read(reader, uuid):
logger.info("[%s] Reading from a server", uuid)
result = ""
while True:
line = await reader.readline()
if not line:
break

line = line.decode("latin1").rstrip()
if line:
result += line
return result

async def http_get(url, uuid):
url = urllib.parse.urlsplit(url)
logger.info("[%s] Connecting to a server", uuid)

reader, writer = await asyncio.open_connection(url.hostname, 80) # Connect to a server

query = f"GET {url.path or '/'} HTTP/1.0\r\n" f"Host: {url.hostname}\r\n" f"\r\n"
logger.info("[%s] Writing to a server", uuid)
writer.write(query.encode("latin-1")) # Write to a server

response = await read(reader, uuid) # Read from a server
return response

async def main():
url = "https://httpbin.org/anything"
tasks = [
http_get(url, "task-0"),
http_get(url, "task-1"),
]
results = await asyncio.gather(*tasks)
# print(len(results))

asyncio.run(main())

Compared to the solution that we brought above, we see that the function that is responsible for connection is a coroutine, and the function that is responsible for writing is not a coroutine; it’s a usual function. So in the case of the classic coroutine example, we were initiating connection and afterwards waiting for the writing operation to be triggered by the event loop, but in the case of the native coroutine example, we initiate connection via coroutine, and whenever connection is ready, it’ll be ready for writing and reading as well.

The asyncio.open_connection function returns two high-level objects, which are stream objects. Streams are high-level async/await-ready primitives to work with network connections. Streams allow sending and receiving data without using callbacks or low-level protocols and transports. Under the hood, they share the same socket object.

The output of the code looks like this:

python3 asyncio_http_client.py
[task-0] Connecting to a server
[task-1] Connecting to a server
[task-0] Writing to a server
[task-0] Reading from a server
[task-1] Writing to a server
[task-1] Reading from a server
[task-0] was completed
[task-1] was completed

For completeness, I will give an example of a modern asynchronous Python library called aiohttp, which is used to implement asynchronous HTTP clients and servers.

import asyncio

import aiohttp


async def fetch_url(url, uuid):
async with aiohttp.ClientSession() as session:
print("[%s] Connecting to a server" % uuid)
async with session.get(url) as response:
print("[%s] Writing to a server" % uuid)
result = await response.text()
print("[%s] Reading from a server" % uuid)
print("[%s] was completed" % uuid)
return result


async def main():
tasks = [
fetch_url("https://httpbin.org/anything", "task-0"),
fetch_url("https://httpbin.org/anything", "task-1"),
]
results = await asyncio.gather(*tasks)


asyncio.run(main())

It’s a high-level framework, and it’s not possible to put prints in a granular way to understand all the details deeply, but I put them in places that are more or less describing the process.

And the output looks like the following:

python3 aiohttp_example.py
[task-0] Connecting to a server
[task-1] Connecting to a server
[task-0] Writing to a server
[task-0] Reading from a server
[task-0] was completed
[task-1] Writing to a server
[task-1] Reading from a server
[task-1] was completed

If we draw parallels between our solution (classic generator) and the aiohttp one, that would be the following:

connect → aiohttp.ClientSession
write → session.get
read → response.text
event_loop → asyncio.run
http_get → fetch_url

That was it!

--

--

Vardan Aloyan
Vardan Aloyan

Written by Vardan Aloyan

Lead Software Engineer @ Swisscom

Responses (1)