Async Processing in Python: Make Data Pipelines Scream

Arup Nanda
Level Up Coding
Published in
9 min readSep 7, 2020

--

<a href=’https://www.freepik.com/photos/background'>Background photo created by kjpargeter — www.freepik.com</a>

How do you enable parallel processing in python? Threading is one way, but building a fully resilient multi-threaded process is hard. Incorporating asynchronous processing in python is a much simpler and practical way to introduce parallel processing in your applications. In this very short blog post, you will learn how to enable simple async functionalities.

The “Normal” Way

Let’s examine the standard way a program is written. Consider a process defined in a function called myproc() that does something that takes 5 seconds. We will simulate it by putting a sleep of 5 seconds. When the function call is made, we print a line “myproc started …”. Likewise, we print “myproc finished …” when the function finished execution. Using the time module, we will record the elapsed time. Here is the code.

import timedef myproc():
print("myProc started ...")
t1 = time.perf_counter()
time.sleep(5)
t = time.perf_counter() - t1
print(f" myProc finished in {t:0.5f} seconds.")

def main():
for _ in range(5):
myproc()

if __name__ == "__main__":
start_sec = time.perf_counter()
main()
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")

Let’s call this file sync.py. When we execute this script in python:

myProc started ...
myProc finished in 5.00262 seconds.
myProc started ...
myProc finished in 5.00281 seconds.
myProc started ...
myProc finished in 5.00011 seconds.
myProc started ...
myProc finished in 5.00042 seconds.
myProc started ...
myProc finished in 5.00504 seconds.
Job finished in 25.01145 seconds.

The job took a total of 25 seconds as expected, since each run of the function myproc() takes 5 seconds. Running it 5 times, sequentially, we finished the job in 25 seconds.

The Async Way

Now, let’s rewrite it in an asynchronous manner. Here is the modified code. The modified parts are shown in bold.

import asyncio
import time
async def myproc():
print("myProc started ...")
t1 = time.perf_counter()
await asyncio.sleep(5)
t = time.perf_counter() - t1
print(f" myProc finished in {t:0.5f} seconds.")
async def main():
await asyncio.gather(
myproc(),
myproc(),
myproc(),
myproc(),
myproc()
)
if __name__ == "__main__":
start_sec = time.perf_counter()
asyncio.run(main())
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")

Here is the output:

myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc finished in 5.00337 seconds.
myProc finished in 5.00347 seconds.
myProc finished in 5.00349 seconds.
myProc finished in 5.00351 seconds.
myProc finished in 5.00353 seconds.
Job finished in 5.00495 seconds.

Whoa! The whole job finished in just 5 seconds (the time it takes for each run of the function myproc()). How come?

The Async/Await Combo

The trick is the inclusion of async and await keywords in the above code. These keywords make sure that the function behaves in an async manner. Note the following line:

await sleep(5)

The await keyword tells python not to wait for its completion, but in the control back to the caller temporarily. Let me repeat that, as it is a pretty powerful concept. The keyword tells the python interpreter to give the control back to the called (in this case the program main()) but temporarily. It gets the control back once the execution is over. Note the following line:

asyncio.run(main())

This tells the main() function to execute the function in an async manner, i.e. not wait for the completion of myproc(). But then there is nothing else in the main program; so it calls the next iteration of the myproc(), again asynchronously as before, and it also passes the control as soon as the await keyword is encountered.

However, is the control back to main() gone forever? Not at all. The moment the myproc() completes (which is about 5 seconds, as designed), the control passes back to the function. The function executes the next line, which is printing the finishing line as well as the elapsed time.

The function call still took 5 seconds (as expected), but the program executed all the function calls in sort of a parallel manner. Hence the overall time was also just 5 seconds.

Note, I used “sort of parallel”. The execution was actually not in parallel. We put the conditions in the logic of the function to tell when to get the control back to the caller. IT mimicked parallelism, but it’s not actually parallel. It’s merely the fact the called function was a non-blocking call. Confused?

We Control the Async Part

To illustrate the subtle concept of how we control the asynchronous nature of the calls, let’s modify the script async1.py as follows. Let’s call it async2.py. The change is in bold.

import asyncio
import time
async def myproc():
print("myProc started ...")
t1 = time.perf_counter()
await asyncio.sleep(2.5)
time.sleep(2.5)
t = time.perf_counter() - t1
print(f" myProc finished in {t:0.5f} seconds.")
async def main():
await asyncio.gather(
myproc(),
myproc(),
myproc(),
myproc(),
myproc()
)
if __name__ == "__main__":
start_sec = time.perf_counter()
asyncio.run(main())
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")

It’s a simple but important change. Instead of making the function spend all its 5 seconds asynchronously, I spilt it as 2.5 secs each on asyncio.sleep and time.sleep (which is synchronous). Here is the result when we ran the script:

myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc finished in 5.00751 seconds.
myProc finished in 7.50905 seconds.
myProc finished in 10.01197 seconds.
myProc finished in 12.51726 seconds.
myProc finished in 15.02254 seconds.
Job finished in 15.02414 seconds.

The output is illuminating. Note how all the function calls started immediately, as expected. However, each call now took progressively more time, exactly 2.5 seconds more from the last one. Why?

It’s because only the 2.5 seconds of the call was async. Once that was reached, the call returned back to the function. The second sleep was not async; hence the function did not return control back to the caller, main() in this case until that was complete. Hence it was a blocking call. This is why the second function had to wait for its execution of that sleep function. So, it was not parallel execution. It was async execution that we controlled.

Order of Calls Matter

To illustrate how this control can be useful and important. let’s make another small change to the program. Call it async3.py. In this, we will simply alter the sequence of calling async and sync sleep commands, as shown in bold.

import asyncio
import time
async def myproc():
print("myProc started ...")
t1 = time.perf_counter()
time.sleep(2.5)
await asyncio.sleep(2.5)

t = time.perf_counter() - t1
print(f" myProc finished in {t:0.5f} seconds.")
async def main():
await asyncio.gather(
myproc(),
myproc(),
myproc(),
myproc(),
myproc()
)
if __name__ == "__main__":
start_sec = time.perf_counter()
asyncio.run(main())
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")

Note: it’s exactly the same as async2.py; just the order of calling the sleep changed. Earlier we called async sleep first and then sync sleep. Here we are calling sync sleep first. When you run it, you will get this output:

myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc started ...
myProc finished in 12.51053 seconds.
myProc finished in 10.00526 seconds.
myProc finished in 7.50407 seconds.
myProc finished in 5.00093 seconds.
myProc finished in 5.00068 seconds.
Job finished in 15.01211 seconds.

Notice how the first line of the function call “myProc started …” now comes up staggered. In fact, it comes after every 2.5 seconds. Why? Simple; the function has a blocking call of sleep for 2.5 seconds; hence it has to wait that long before sending the control back to the main program. The main() program can’t call myProc() again until that time is past.

After 2.5 seconds, the function encounters the async sleep call. At that time, it returned the control to main(). However, by the time the main() went back to the first call of myproc() it was after all the other calls were done. Hence you wee an interesting pattern here: the first call to myproc() took the longest as opposed to the previous one, where the last call took the longest. The overall time remained the same, though.

This is why the order in which you call the sync and async parts of the function matter how the program will return values from sub-components. The overall time will not vary.

Why is it important? Well, let’s consider another small task in this program. We will need to compute the total of some number inside the function call. To keep it simple, we will just increment the variable sum by 1 at each invocation of the function. In addition, let’s also display how the functions are called after the other. We will change the function to accept a parameter called callid which is merely a number to represent each function call distinctly. We will then create a chain of the call ids to show the functions were called. First, we will do this in this sequence: async and then sync.

import asyncio
import time
chain = ""
sum = 0
async def myproc(callid):
global chain
global sum

print(f"myProc {callid} started ...")
t1 = time.perf_counter()
await asyncio.sleep(2.5)
chain = chain + "->" + str(callid)
sum = sum + 1

time.sleep(2.5)
t = time.perf_counter() - t1
print(f" myProc {callid} finished in {t:0.5f} seconds. sum = {sum} chain {chain}")
async def main():
await asyncio.gather(
myproc(1),
myproc(2),
myproc(3),
myproc(4),
myproc(5)
)
if __name__ == "__main__":
start_sec = time.perf_counter()
asyncio.run(main())
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")

Here is the output:

myProc 1 started ...
myProc 2 started ...
myProc 3 started ...
myProc 4 started ...
myProc 5 started ...
myProc 1 finished in 5.00606 seconds. sum = 1 chain ->1
myProc 2 finished in 7.51137 seconds. sum = 2 chain ->1->2
myProc 3 finished in 10.01224 seconds. sum = 3 chain ->1->2->3
myProc 4 finished in 12.51499 seconds. sum = 4 chain ->1->2->3->4
myProc 5 finished in 15.01671 seconds. sum = 5 chain ->1->2->3->4->5
Job finished in 15.01861 seconds.

The functions were called in the sequence we passed, as it is shown in the chain; but note the sum variable. It varies within each function call. Why?

It’s simple. When the function was called the first time, it returned the control which then went to the second function call. However, when the second function call (callid=2) started to execute the sync part, the first function call (callid=1) has already started the computation and done with its work. At that time sum was 0; so it added 1 to it to come up with 1. By the time the second function call came to the compute portion of the code, the sum was already 1; so it came up with 1 + 1 = 2. And so was the chain for the rest of the calls.

Why is it important?

If all you need is the final sum, which is at the end of the job, then it is no problem. The number will be correct. However, if you are using the sum inside the function, e.g. you are checking if the sum is greater than 3, then clearly calls 1, 2, and 3 will fail but 4 and 5 will succeed. This could introduce a bug.

Contrast that with a small variation where you reverse the order of sync and async calls

import asyncio
import time
chain = ""
sum = 0
async def myproc(callid):
global chain
global sum
print(f"myProc {callid} started ...")
t1 = time.perf_counter()
time.sleep(2.5)
chain = chain + "->" + str(callid)
sum = sum + 1
await asyncio.sleep(2.5)
t = time.perf_counter() - t1
print(f" myProc {callid} finished in {t:0.5f} seconds. sum = {sum} chain {chain}")
async def main():
await asyncio.gather(
myproc(1),
myproc(2),
myproc(3),
myproc(4),
myproc(5)
)
if __name__ == "__main__":
start_sec = time.perf_counter()
asyncio.run(main())
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")

Here is the output:

myProc 1 started ...
myProc 2 started ...
myProc 3 started ...
myProc 4 started ...
myProc 5 started ...
myProc 1 finished in 12.51241 seconds. sum = 5 chain ->1->2->3->4->5
myProc 2 finished in 10.01062 seconds. sum = 5 chain ->1->2->3->4->5
myProc 3 finished in 7.51010 seconds. sum = 5 chain ->1->2->3->4->5
myProc 4 finished in 5.00613 seconds. sum = 5 chain ->1->2->3->4->5
myProc 5 finished in 5.00680 seconds. sum = 5 chain ->1->2->3->4->5
Job finished in 15.01523 seconds.

The behavior is different. Note how the value of the sum is the same and the chain is the same. Why is that? That’s because by the time the function has completed, the values have all been assigned appropriately.

Bottomline: pay attention to which parts of the function call are sync or async and where you use the variables. It could yield different results without you even knowing about it.

In Summary

  • You can use async processing in python to mimic parallel processing.
  • But it is not multithreading. You control which parts are async and which ones are not.
  • The combination of 2 keywords make it possible. async applies to function definitions to tell python the function is an async call. await tells the command to pass the control back to the caller temporarily.
  • Pay attention to how the logic inside function call behaves for sync and async events. The variable values could wrong results if not accessed at the right place.

--

--

Award winning data/analytics/ML and engineering leader, raspberry pi junkie, dad and husband.