MeshWorld India LogoMeshWorld.
PythonAsyncioAsync/AwaitConcurrencyPerformanceWeb DevelopmentAPIsBackend9 min read

Python Async/Await: The Complete Guide for Developers

Rachel
By Rachel
Python Async/Await: The Complete Guide for Developers

Python’s asyncio is the standard for writing concurrent code — but most developers bounce off it. The syntax looks simple, but the mental model is different. This guide covers everything from basic async/await to building production-ready async systems.

TL;DR
  • async def creates coroutines; await yields control to the event loop
  • Use asyncio.gather() for concurrent execution, asyncio.create_task() for background work
  • Never call time.sleep() in async code — use asyncio.sleep()
  • aiohttp for HTTP, asyncpg for PostgreSQL, aioredis for Redis
  • Use asyncio.Queue for producer/consumer patterns

Why Async?

Synchronous Python handles one thing at a time per thread. For I/O-bound work (HTTP requests, database queries, file operations), this wastes time waiting.

Synchronous (blocking):

plaintext
Request 1 → Wait 100ms → Response → Request 2 → Wait 100ms → Response
Total: 200ms

Asynchronous (concurrent):

plaintext
Request 1 → Request 2 → Wait 100ms → Response 1 → Response 2
Total: 100ms

The Scenario: Your API makes 5 database calls per request. With sync code, that’s 500ms sequential. With async, it’s ~100ms concurrent.

Basic Syntax

async def and await

python
import asyncio

async def fetch_data():
    print("Starting fetch...")
    await asyncio.sleep(1)  # Non-blocking sleep
    print("Fetch complete!")
    return {"data": "value"}

async def main():
    result = await fetch_data()
    print(result)

# Run the async program
asyncio.run(main())

Key rules:

  • await only works inside async def functions
  • asyncio.run() starts the event loop and runs your main coroutine
  • Regular functions can’t await — they must be async

The Event Loop

The event loop is the heart of asyncio:

python
import asyncio

# Get the current event loop
loop = asyncio.get_event_loop()

# Schedule a coroutine
task = loop.create_task(fetch_data())

# Run until complete
result = loop.run_until_complete(task)

In practice, use asyncio.run() and let Python manage the loop.

Running Multiple Tasks

Sequential (Slow)

python
async def sequential():
    result1 = await fetch_data()  # 1 second
    result2 = await fetch_data()  # 1 second
    result3 = await fetch_data()  # 1 second
    return [result1, result2, result3]
# Total: 3 seconds

Concurrent (Fast)

python
async def concurrent():
    tasks = [
        fetch_data(),
        fetch_data(),
        fetch_data()
    ]
    results = await asyncio.gather(*tasks)
    return results
# Total: ~1 second

With Error Handling

python
async def concurrent_with_errors():
    tasks = [
        asyncio.create_task(fetch_data()),
        asyncio.create_task(risky_operation()),
        asyncio.create_task(another_fetch())
    ]
    
    # Return_exceptions=True prevents one failure from stopping everything
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.wait() for Fine Control

python
async def wait_with_timeout():
    tasks = [fetch_data(), slow_operation(), fast_operation()]
    
    # Return when first completes
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # Cancel remaining tasks
    for task in pending:
        task.cancel()
    
    # Get results from completed tasks
    for task in done:
        try:
            result = task.result()
            print(f"Completed: {result}")
        except Exception as e:
            print(f"Failed: {e}")

Real HTTP Requests

Use aiohttp instead of requests:

python
import aiohttp
import asyncio

async def fetch_url(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as response:
        return await response.text()

async def fetch_all_urls(urls: list[str]):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# Usage
urls = [
    'https://api.github.com/users/python',
    'https://api.github.com/users/google',
    'https://api.github.com/users/microsoft'
]
results = asyncio.run(fetch_all_urls(urls))

Connection Pooling

python
async def fetch_with_pool():
    # Limit concurrent connections
    connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
    timeout = aiohttp.ClientTimeout(total=30)
    
    async with aiohttp.ClientSession(
        connector=connector,
        timeout=timeout
    ) as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

Database Operations

PostgreSQL with asyncpg

python
import asyncpg
import asyncio

async def fetch_users():
    # Connect to database
    conn = await asyncpg.connect(
        host='localhost',
        database='mydb',
        user='user',
        password='pass'
    )
    
    try:
        # Fetch multiple rows
        rows = await conn.fetch('SELECT * FROM users WHERE active = $1', True)
        
        # Fetch single row
        user = await conn.fetchrow(
            'SELECT * FROM users WHERE id = $1',
            123
        )
        
        # Execute write operation
        await conn.execute(
            'INSERT INTO users(name, email) VALUES($1, $2)',
            'John', '[email protected]'
        )
        
        return rows
    finally:
        await conn.close()

# Connection pooling
async def fetch_with_pool():
    pool = await asyncpg.create_pool(
        host='localhost',
        database='mydb',
        user='user',
        password='pass',
        min_size=5,
        max_size=20
    )
    
    async with pool.acquire() as conn:
        rows = await conn.fetch('SELECT * FROM users')
        return rows
    
    await pool.close()

Transactions

python
async def transfer_funds(from_id: int, to_id: int, amount: float):
    conn = await asyncpg.connect(dsn='postgresql://...')
    
    try:
        async with conn.transaction():
            # Both operations succeed or both fail
            await conn.execute(
                'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
                amount, from_id
            )
            await conn.execute(
                'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
                amount, to_id
            )
    finally:
        await conn.close()

Producer/Consumer Pattern

Use asyncio.Queue for decoupled processing:

python
import asyncio
from asyncio import Queue

async def producer(queue: Queue, name: str):
    for i in range(5):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"{name} produced: {item}")
        await asyncio.sleep(0.5)

async def consumer(queue: Queue, name: str):
    while True:
        item = await queue.get()
        if item is None:  # Poison pill to stop
            break
        
        print(f"{name} consumed: {item}")
        await asyncio.sleep(1)  # Simulate processing
        queue.task_done()

async def main():
    queue = Queue(maxsize=10)
    
    # Start producers
    producers = [
        asyncio.create_task(producer(queue, f"P{i}"))
        for i in range(2)
    ]
    
    # Start consumers
    consumers = [
        asyncio.create_task(consumer(queue, f"C{i}"))
        for i in range(3)
    ]
    
    # Wait for producers to finish
    await asyncio.gather(*producers)
    
    # Signal consumers to stop
    for _ in consumers:
        await queue.put(None)
    
    await asyncio.gather(*consumers)

asyncio.run(main())

Web Frameworks

FastAPI (Native Async)

python
from fastapi import FastAPI
import asyncpg

app = FastAPI()

# Database pool
pool = None

@app.on_event("startup")
async def startup():
    global pool
    pool = await asyncpg.create_pool(dsn='postgresql://...')

@app.on_event("shutdown")
async def shutdown():
    await pool.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    async with pool.acquire() as conn:
        user = await conn.fetchrow(
            'SELECT * FROM users WHERE id = $1',
            user_id
        )
        if user:
            return dict(user)
        return {"error": "User not found"}

@app.get("/users")
async def list_users():
    async with pool.acquire() as conn:
        rows = await conn.fetch('SELECT * FROM users')
        return [dict(row) for row in rows]

Background Tasks

python
from fastapi import BackgroundTasks

async def send_email(email: str, message: str):
    await asyncio.sleep(2)  # Simulate sending
    print(f"Email sent to {email}")

@app.post("/signup")
async def signup(email: str, background_tasks: BackgroundTasks):
    # Save user to database
    await save_user(email)
    
    # Don't block response — send email in background
    background_tasks.add_task(send_email, email, "Welcome!")
    
    return {"status": "success"}

Common Pitfalls

1. Blocking the Event Loop

python
# WRONG: Blocks entire event loop
async def wrong():
    time.sleep(5)  # Blocks everything!
    return "done"

# RIGHT: Use async version
async def right():
    await asyncio.sleep(5)  # Yields control
    return "done"

2. Running Sync Code in Async

python
import asyncio
from concurrent.futures import ThreadPoolExecutor

# Create thread pool for CPU-bound work
executor = ThreadPoolExecutor(max_workers=4)

async def cpu_bound_task(data):
    # Run in thread pool to avoid blocking event loop
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        executor,
        heavy_computation,
        data
    )
    return result

3. Forgetting to await

python
# WRONG: Returns coroutine object, not result
async def wrong():
    result = fetch_data()  # Missing await!
    print(result)  # <coroutine object fetch_data>

# RIGHT:
async def right():
    result = await fetch_data()
    print(result)

4. Creating Too Many Tasks

python
# WRONG: Unbounded concurrency
async def wrong():
    tasks = [fetch_url(url) for url in 10000_urls]
    return await asyncio.gather(*tasks)

# RIGHT: Use semaphore to limit concurrency
semaphore = asyncio.Semaphore(100)

async def fetch_limited(url):
    async with semaphore:
        return await fetch_url(url)

async def right():
    tasks = [fetch_limited(url) for url in urls]
    return await asyncio.gather(*tasks)

Advanced Patterns

Context Managers

python
from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_resource():
    resource = await acquire_resource()
    try:
        yield resource
    finally:
        await release_resource(resource)

async def use_resource():
    async with managed_resource() as r:
        await r.do_something()
    # Automatically released

Async Generators

python
async def stream_large_dataset():
    conn = await asyncpg.connect(dsn='postgresql://...')
    try:
        async with conn.transaction():
            async for record in conn.cursor('SELECT * FROM large_table'):
                yield record
    finally:
        await conn.close()

# Usage
async for record in stream_large_dataset():
    process(record)

Timeouts

python
async def fetch_with_timeout():
    try:
        result = await asyncio.wait_for(
            fetch_data(),
            timeout=5.0
        )
        return result
    except asyncio.TimeoutError:
        return {"error": "Request timed out"}

Rate Limiting

python
import asyncio
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, calls_per_second: float):
        self.interval = 1.0 / calls_per_second
        self.last_call = 0
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        async with self.lock:
            now = asyncio.get_event_loop().time()
            elapsed = now - self.last_call
            
            if elapsed < self.interval:
                await asyncio.sleep(self.interval - elapsed)
            
            self.last_call = asyncio.get_event_loop().time()

# Usage
limiter = RateLimiter(calls_per_second=10)

async def rate_limited_fetch(url):
    await limiter.acquire()
    return await fetch_url(url)

Testing Async Code

pytest-asyncio

python
import pytest

@pytest.mark.asyncio
async def test_fetch_data():
    result = await fetch_data()
    assert result["data"] == "value"

@pytest.mark.asyncio
async def test_concurrent_fetches():
    tasks = [fetch_data() for _ in range(5)]
    results = await asyncio.gather(*tasks)
    assert len(results) == 5

Mocking Async Functions

python
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_with_mock():
    with patch('module.fetch_data', new_callable=AsyncMock) as mock:
        mock.return_value = {"mocked": True}
        
        result = await fetch_data()
        assert result == {"mocked": True}

Testing Time-Based Code

python
import pytest
from unittest.mock import patch

@pytest.mark.asyncio
async def test_sleep():
    with patch('asyncio.sleep') as mock_sleep:
        await my_async_function()
        
        # Verify sleep was called with expected duration
        mock_sleep.assert_called_once_with(1.0)

Performance Comparison

Approach100 HTTP RequestsMemory Usage
Synchronous (requests)~50 secondsLow
Threading~10 secondsHigh
Asyncio (aiohttp)~2 secondsLow

When NOT to Use Async

Use CaseBetter Alternative
CPU-intensive workMultiprocessing
Simple scriptsRegular sync code
Database without async driverSync with connection pooling
Single requestDon’t add complexity

Migration Strategy

Moving from sync to async:

python
# Step 1: Identify I/O bottlenecks
# Step 2: Add async versions alongside sync
# Step 3: Migrate incrementally
# Step 4: Remove sync versions

# Bridge pattern
async def async_api():
    return await internal_async_impl()

def sync_api():
    return asyncio.run(async_api())

Summary

  • async/await — Core syntax for non-blocking code
  • asyncio.gather() — Run tasks concurrently
  • Use async librariesaiohttp, asyncpg, aioredis
  • Avoid blocking — Never use time.sleep(), requests.get() in async code
  • Control concurrency — Use Semaphore for rate limiting
  • Test properly — Use pytest-asyncio and AsyncMock

Async Python unlocks massive performance gains for I/O-bound applications. The learning curve is worth it.


Share_This Twitter / X
Rachel
Written By

Rachel

Senior Full-stack Developer specializing in backend systems. Expert in database optimization, asynchronous programming in Python, and building resilient API architectures.

Enjoyed this article?

Support MeshWorld and help us create more technical content