Getting Started with Python Async Programming

by
0 comments
Getting Started with Python Async Programming


Image by author

# Introduction

Most Python applications spend significant time waiting on APIs, databases, file systems, and network services. Async programming allows a program to execute other tasks instead of pausing and blocking while waiting for I/O operations.

In this tutorial, you will learn the fundamentals of async programming in Python using clear code examples. We’ll compare synchronous and asynchronous execution, explain how the event loop works, and apply the async pattern to real-world scenarios like concurrent API requests and background tasks.

By the end of this guide, you’ll understand when async programming is useful, how to use async and await correctly, and how to write scalable and reliable async Python code.

# Defining Async Programming in Python

Async programming allows a program to pause execution while waiting for an operation to complete and execute other tasks in the meantime.

Core building blocks include:

  • async def To define a coroutine
  • await for non-blocking wait
  • Event Loop for Task Scheduling

Comment: Async programming improves throughput, not raw computation speed.

# Understanding Async Event Loop in Python

The event loop is responsible for managing and executing asynchronous tasks.

Key responsibilities include:

  • Tracking pending and completed tasks
  • Switching execution when tasks are waiting for I/O
  • concurrent coordination without threads

uses python asyncio The library as its standard async runtime.

# Comparing Sequential vs Async Execution in Python

This section shows how blocking sequential code compares to asynchronous concurrent execution and how async reduces the total wait time for I/O-bound tasks.

// Examining Sequential Blocking Example

Sequential execution runs tasks one after the other. If a task performs a blocking operation, the entire program waits until that operation completes. This approach is simple but inefficient for I/O-bound workloads where waiting dominates the execution time.

This function simulates a blocking task. call to time.sleep Stops the entire program for the specified number of seconds.

import time

def download_file(name, seconds):
    print(f"Starting {name}")
    time.sleep(seconds)
    print(f"Finished {name}")

The timer function starts before the call and stops after all three calls are completed. Each function runs only after the previous function has finished.

start = time.perf_counter()

download_file("file-1", 2)
download_file("file-2", 2)
download_file("file-3", 2)

end = time.perf_counter()
print(f"(TOTAL SYNC) took {end - start:.4f} seconds")

Output:

  • file-1 Starts the program and blocks it for two seconds
  • file-2 starts only after file-1 End
  • file-3 starts only after file-2 End

The total runtime is the sum of all delays, approximately six seconds.

Starting file-1
Finished file-1
Starting file-2
Finished file-2
Starting file-3
Finished file-3
(TOTAL SYNC) took 6.0009 seconds

// Checking an Asynchronous Concurrent Instance

Asynchronous execution allows tasks to run concurrently. When a task reaches an awaited I/O operation, it stops and allows other tasks to continue. This overlapping of latency improves throughput significantly.

This async function defines a coroutine. await asyncio.sleep The call only stops the current task, not the entire program.

import asyncio
import time

async def download_file(name, seconds):
    print(f"Starting {name}")
    await asyncio.sleep(seconds)
    print(f"Finished {name}")

asyncio.gather Schedules all three coroutines to run simultaneously on the event loop.

async def main():
    start = time.perf_counter()

    await asyncio.gather(
        download_file("file-1", 2),
        download_file("file-2", 2),
        download_file("file-3", 2),
    )

    end = time.perf_counter()
    print(f"(TOTAL ASYNC) took {end - start:.4f} seconds")

This starts the event loop and executes the async program.

Output:

  • All three tasks start at approximately the same time
  • Each task waits for two seconds independently
  • While one task is waiting, others continue to execute
  • The total runtime is close to the longest single delay, about two seconds.
Starting file-1
Starting file-2
Starting file-3
Finished file-1
Finished file-2
Finished file-3
(TOTAL ASYNC) took 2.0005 seconds

# Knowing how await works in Python async code

await The keyword tells Python that a coroutine can stop and allow other tasks to run.

Wrong usage:

async def task():
    asyncio.sleep(1)

Correct usage:

async def task():
    await asyncio.sleep(1)

fail to use await Prevents concurrency and may generate runtime warnings.

# Running Multiple Async Tasks Using asyncio.gather

asyncio.gather Allows multiple coroutines to run simultaneously and aggregate their results after all tasks are completed. It is commonly used when multiple independent async operations can be executed in parallel.

job Coroutine simulates an asynchronous task. It prints a start message, waits for a second using a non-blocking sleep, then prints a finish message and returns a result.

import asyncio
import time

async def job(job_id, delay=1):
    print(f"Job {job_id} started")
    await asyncio.sleep(delay)
    print(f"Job {job_id} finished")
    return f"Completed job {job_id}"

asyncio.gather Schedules all three jobs to run simultaneously on an event loop. Execution of each task begins immediately until it reaches a awaited operation.

async def main():
    start = time.perf_counter()

    results = await asyncio.gather(
        job(1),
        job(2),
        job(3),
    )

    end = time.perf_counter()

    print("nResults:", results)
    print(f"(TOTAL WALL TIME) {end - start:.4f} seconds")

asyncio.run(main())

Output:

  • All three tasks start at approximately the same time
  • Each task waits for one second independently
  • While one job waits, others keep running
  • Results are returned in the order the tasks were passed asyncio.gather
  • Total execution time is closer to one second, not three
Job 1 started
Job 2 started
Job 3 started
Job 1 finished
Job 2 finished
Job 3 finished

Results: ('Completed job 1', 'Completed job 2', 'Completed job 3')
(TOTAL WALL TIME) 1.0013 seconds

This pattern is fundamental for concurrent network requests, database queries, and other I/O-bound operations.

# Making concurrent HTTP requests

Async HTTP requests are a common real-world use case where async programming provides immediate benefits. When multiple APIs are called sequentially, the total execution time becomes the sum of all response delays. Async allows these requests to run concurrently.

This list contains three URLs that intentionally delay their responses by one, two, and three seconds.

import asyncio
import time
import urllib.request
import json

URLS = (
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/3",
)

This function executes an intercepted HTTP request using the standard library. This cannot be waited for directly.

def fetch_sync(url):
    """Blocking HTTP request using standard library"""
    with urllib.request.urlopen(url) as response:
        return json.loads(response.read().decode())

fetch Coroutine measures execution time and logs when the request is initiated. Blocking HTTP requests are offloaded using a background thread asyncio.to_thread. This prevents the event loop from blocking.

async def fetch(url):
    start = time.perf_counter()
    print(f"Fetching {url}")

    # Run blocking IO in a thread
    data = await asyncio.to_thread(fetch_sync, url)

    elapsed = time.perf_counter() - start
    print(f"Finished {url} in {elapsed:.2f} seconds")

    return data

All requests are scheduled concurrently using asyncio.gather.

async def main():
    start = time.perf_counter()

    results = await asyncio.gather(
        *(fetch(url) for url in URLS)
    )

    total = time.perf_counter() - start
    print(f"nFetched {len(results)} responses")
    print(f"(TOTAL WALL TIME) {total:.2f} seconds")

asyncio.run(main())

Output:

  • All three HTTP requests start almost immediately
  • Each request completes after its own delay
  • Longest request determines total wall time
  • Total runtime is about three and a half seconds, not including all delays.
Fetching https://httpbin.org/delay/1
Fetching https://httpbin.org/delay/2
Fetching https://httpbin.org/delay/3
Finished https://httpbin.org/delay/1 in 1.26 seconds
Finished https://httpbin.org/delay/2 in 2.20 seconds
Finished https://httpbin.org/delay/3 in 3.52 seconds

Fetched 3 responses
(TOTAL WALL TIME) 3.52 seconds

This approach significantly improves performance when calling multiple APIs and is a common pattern in modern async Python services.

# Implementing Error Handling Patterns in Async Python Applications

Strong async applications should handle failures gracefully. In concurrent systems, a single failed task should not cause the entire workflow to fail. Proper error handling ensures that successful tasks are completed while failures are clearly reported.

This list includes two successful endpoints and one endpoint that returned an HTTP 404 error.

import asyncio
import urllib.request
import json
import socket

URLS = (
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/status/404",
)

This function blocks HTTP requests with a timeout. This may generate exceptions such as timeouts or HTTP errors.

def fetch_sync(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as response:
        return json.loads(response.read().decode())

This function wraps a blocked HTTP request in a secure asynchronous interface. Blocking operation is executed in background thread asyncio.to_threadWhich prevents the event loop from stopping while the request is in progress.

Common failure cases such as timeouts and HTTP errors are caught and converted into structured responses. This ensures that errors are handled predictably and that a single failed request does not disrupt the execution of other concurrent tasks.

async def safe_fetch(url, timeout=5):
    try:
        return await asyncio.to_thread(fetch_sync, url, timeout)

    except socket.timeout:
        return {"url": url, "error": "timeout"}

    except urllib.error.HTTPError as e:
        return {"url": url, "error": "http_error", "status": e.code}

    except Exception as e:
        return {"url": url, "error": "unexpected_error", "message": str(e)}

All requests are executed concurrently using asyncio.gather.

async def main():
    results = await asyncio.gather(
        *(safe_fetch(url) for url in URLS)
    )

    for result in results:
        print(result)

asyncio.run(main())

Output:

  • The first two requests complete successfully and return parsed JSON data.
  • Third request returns a structured error instead of raising an exception
  • All results are returned at once without disrupting the workflow
{'args': {}, 'data': '', 'files': {}, 'form': {}, 'headers': {'Accept-Encoding': 'identity', 'Host': 'httpbin.org', 'User-Agent': 'Python-urllib/3.11', 'X-Amzn-Trace-Id': 'Root=1-6966269f-1cd7fc7821bc6bc469e9ba64'}, 'origin': '3.85.143.193', 'url': 'https://httpbin.org/delay/1'}
{'args': {}, 'data': '', 'files': {}, 'form': {}, 'headers': {'Accept-Encoding': 'identity', 'Host': 'httpbin.org', 'User-Agent': 'Python-urllib/3.11', 'X-Amzn-Trace-Id': 'Root=1-6966269f-5f59c151487be7094b2b0b3c'}, 'origin': '3.85.143.193', 'url': 'https://httpbin.org/delay/2'}
{'url': 'https://httpbin.org/status/404', 'error': 'http_error', 'status': 404}

This pattern ensures that a single failed request does not break the entire async operation and is essential for production-ready async applications.

# Using Async Programming in Jupyter Notebook

Jupyter Notebooks already run an active event loop. Due to this, asyncio.run It cannot be used inside a notebook cell, as it attempts to start a new event loop while one is already running.

This async function simulates using a simple non-blocking task asyncio.sleep.

import asyncio

async def main():
    await asyncio.sleep(1)
    print("Async task completed")

Wrong usage in notebook:

Correct usage in notebook:

Understanding this difference ensures that async code runs correctly in Jupyter Notebooks and prevents common runtime errors when experimenting with asynchronous Python.

# Controlling Concurrency with Async Semaphores

External APIs and services often impose rate limits, making it unsafe to run too many requests at the same time. Async semaphores allow you to control how many tasks execute concurrently, taking advantage of asynchronous execution.

The semaphore is initialized with a limit of two, meaning that only two tasks can enter the protected section at the same time.

import asyncio
import time

semaphore = asyncio.Semaphore(2)  # allow only 2 tasks at a time

A work function represents an asynchronous unit of work. Each task must acquire the semaphore before executing, and if the limit is reached, it waits until the slot becomes available.

Once inside the semaphore, the task records its start time, prints a start message, and waits for two seconds of non-blocking sleep to simulate an I/O-bound operation. After the sleep is completed, the task calculates its execution time, prints a completion message, and releases the semaphore.

async def task(task_id):
    async with semaphore:
        start = time.perf_counter()
        print(f"Task {task_id} started")

        await asyncio.sleep(2)

        elapsed = time.perf_counter() - start
        print(f"Task {task_id} finished in {elapsed:.2f} seconds")

main Function Schedules four tasks to run simultaneously asyncio.gatherBut the semaphore ensures that they are executed in two waves of two tasks.

At the end, asyncio.run Starts the event loop and runs the program, resulting in a total execution time of about four seconds.

async def main():
    start = time.perf_counter()

    await asyncio.gather(
        task(1),
        task(2),
        task(3),
        task(4),
    )

    total = time.perf_counter() - start
    print(f"n(TOTAL WALL TIME) {total:.2f} seconds")
asyncio.run(main())

Output:

  • Tasks 1 and 2 start first due to semaphore limit
  • Task 3 and 4 Wait until slots become available
  • Tasks are executed in two waves, each wave lasting two seconds
  • Total wall time is about four seconds
Task 1 started
Task 2 started
Task 1 finished in 2.00 seconds
Task 2 finished in 2.00 seconds
Task 3 started
Task 4 started
Task 3 finished in 2.00 seconds
Task 4 finished in 2.00 seconds

(TOTAL WALL TIME) 4.00 seconds

Semaphore provides an effective way to enforce concurrency limits and protect system stability in production async applications.

# concluding remarks

Async programming is not a universal solution. It is not suitable for CPU-intensive workloads such as machine learning training, image processing or numerical simulation. Its strength lies in handling I/O-bound operations where latency dominates execution.

When used correctly, async programming improves throughput by allowing tasks to progress while others are waiting. proper use of await Concurrency is essential, and async patterns are particularly effective in API-driven and service-based systems.

In a production environment, controlling concurrency and explicitly handling failures is important for building reliable and scalable async Python applications.

abid ali awan (@1Abidaliyawan) is a certified data scientist professional who loves building machine learning models. Currently, he is focusing on content creation and writing technical blogs on machine learning and data science technologies. Abid holds a master’s degree in technology management and a bachelor’s degree in telecommunications engineering. Their vision is to create AI products using graph neural networks for students struggling with mental illness.

Related Articles

Leave a Comment