AsyncIO at Scale: Backpressure, Structured Concurrency, and Cancellation Semantics

Published: August 12, 2020 (5y ago)14 min read

Updated: November 20, 2024 (8mo ago)

Most async outages don’t look like crashes—they look like slow motion. Queues grow silently, memory creeps up, latencies stretch, and only then do errors appear. The fix isn’t a bigger box; it’s discipline: explicit backpressure, structured concurrency, and cancellation that’s part of the contract.

This post is a production‑first field guide. We’ll build a shared mental model, lay down primitives that keep systems honest, and show runnable patterns you can lift into services. We’ll start with backpressure and deadlines you can trust.

The shape of overload (and how to stop it)

Backpressure is the system’s ability to tell producers “not so fast” and make that stick. In asyncio, you’ll usually enforce it at three layers:

  • Bounded admission at the edge (queues, semaphores, rate limits)
  • Flow control at transports/streams (drain() pauses writers)
  • Deadlines on work (budgeted waits, not forever waits)
sequenceDiagram participant P as Producer(s) participant Q as Bounded Queue (maxsize=N) participant C as Consumers (M workers) P->>Q: put(item) Note over Q: size < N → accept Q-->>C: get() → process P--xQ: put(item, timeout=T) Note over P,Q: size == N → wait up to T, else shed/timeout C-->>Q: task_done()

Key rule: if you cannot bound the queue, you have not implemented backpressure—you’ve only moved the problem.

A minimal, bounded producer–consumer that won’t melt

The queue is your pressure gauge. Set a size that reflects real memory limits, add timeouts so producers don’t wait forever, and decide your overload behavior (shed, degrade, or back off).

# examples: bounded_queue.py
import asyncio
from collections.abc import Iterable
 
async def producer(name: str, q: asyncio.Queue[str], items: Iterable[str]) -> None:
    for item in items:
        try:
            # Apply backpressure at admission with a deadline
            await asyncio.wait_for(q.put(item), timeout=0.250)
            print(f"{name}: queued {item} (qsize={q.qsize()})")
        except asyncio.TimeoutError:
            # Overload policy: shed or log + retry with jitter
            print(f"{name}: queue full, shedding {item}")
        await asyncio.sleep(0)  # yield; keep system cancelable
 
async def worker(name: str, q: asyncio.Queue[str]) -> None:
    while True:
        item = await q.get()   # cancellation-safe wait point
        try:
            await asyncio.sleep(0.050)  # simulate I/O work
            print(f"{name}: processed {item}")
        finally:
            q.task_done()
 
async def main() -> None:
    q: asyncio.Queue[str] = asyncio.Queue(maxsize=64)  # bounded!
 
    # Start a few consumers
    workers = [asyncio.create_task(worker(f"W{i}", q)) for i in range(4)]
 
    # Bursty producers
    items = [f"item-{i}" for i in range(500)]
    await asyncio.gather(
        producer("P1", q, items[0::2]),
        producer("P2", q, items[1::2]),
    )
 
    # Wait for all work to drain, then cancel workers
    await q.join()
    for t in workers:
        t.cancel()
    await asyncio.gather(*workers, return_exceptions=True)
 
if __name__ == "__main__":
    asyncio.run(main())

Guidance:

  • Set maxsize based on a memory budget you’re willing to spend under stress.
  • Carry deadlines on put()/get()—don’t suspend producers forever.
  • Decide and document overload behavior (drop, backoff, or degrade).

Admission control with semaphores (cheap and effective)

When there isn’t a natural queue boundary (e.g., fan‑out RPCs), use a semaphore to cap in‑flight work. This is backpressure at the call‑site.

# examples: semaphore_limit.py
import asyncio
 
semaphore = asyncio.Semaphore(200)  # global cap on concurrency
 
async def fetch(url: str) -> bytes:
    await asyncio.sleep(0.010)  # stand‑in for real I/O
    return ("ok:" + url).encode()
 
async def process_one(url: str) -> None:
    async with semaphore:  # admission gate
        data = await fetch(url)
        # ... downstream work
 
async def process_all(urls: list[str]) -> None:
    async with asyncio.TaskGroup() as tg:  # Python 3.11+
        for u in urls:
            tg.create_task(process_one(u))
 
if __name__ == "__main__":
    asyncio.run(process_all([f"https://ex/{i}" for i in range(10_000)]))

Notes:

  • Prefer a single shared Semaphore per bottlenecked resource (e.g., remote API, DB pool). Size it from measurements, not guesses.
  • Pair with short deadlines to avoid piling up stuck work (see next section).

Stream flow control you should actually use (drain())

asyncio transports/streams implement write‑side flow control. When buffers grow past a high‑water mark, writers must await drain(); that await is the backpressure signal.

# examples: stream_backpressure.py
import asyncio
 
async def send_many(writer: asyncio.StreamWriter, chunks: list[bytes]) -> None:
    for chunk in chunks:
        writer.write(chunk)
        # This await cooperates with the transport's high‑water marks
        await writer.drain()
 
async def main() -> None:
    reader, writer = await asyncio.open_connection("example.com", 80)
    try:
        await send_many(writer, [b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"]) 
        await reader.read(1024)
    finally:
        writer.close(); await writer.wait_closed()
 
if __name__ == "__main__":
    asyncio.run(main())

If you skip drain(), you disable a core safety valve and can inflate memory under bursts.

Deadlines over timeouts: budget every await

Time‑bounded systems carry a deadline from ingress to leaf calls and derive budgets per step. In asyncio 3.11+, use asyncio.timeout(); in earlier versions, use asyncio.wait_for.

# examples: deadlines.py
import asyncio
 
async def call_downstream() -> None:
    await asyncio.sleep(0.150)  # stand‑in for real I/O
 
async def handler() -> None:
    # Global request budget: 120 ms
    try:
        async with asyncio.timeout(0.120):  # Python 3.11+
            await call_downstream()
            # pass a reduced budget to children if needed
    except TimeoutError:
        # Map to a 504/overload response, release resources
        return
 
if __name__ == "__main__":
    asyncio.run(handler())

Budget tips:

  • Put a number on every external await. No silent, unbounded waits.
  • On breach, cancel promptly and return a clear, idempotent failure.
  • Derive sub‑budgets rather than reusing the full deadline at every layer.
flowchart TD A[Ingress: deadline 200 ms] --> B[Queue put (10 ms budget)] B --> C[Worker: downstream RPC (120 ms budget)] C --> D[Storage write (40 ms budget)] D --> E[Respond] style A fill:#e1f5fe style B fill:#fff3e0 style C fill:#e8f5e8 style D fill:#fff3e0

Operational guardrails you can enable today

  • Export gauges: queue lengths, semaphore permits in use, stream/backlog metrics.
  • Alert on sustained admission failures, deadline breaches, and retries.
  • Prefer shedding early to building giant buffers—fail fast beats failing late.

Structured concurrency: make lifetimes explicit

Unstructured concurrency spawns tasks that can outlive their parents. That’s how you get leaks, stuck shutdowns, and ghost work. Structured concurrency gives every task a parent scope that is responsible for starting it, awaiting it, and cleaning it up.

In Python 3.11+, asyncio.TaskGroup brings this model to the standard library.

flowchart TD P[Parent Scope] -->|create_task| T1[Child A] P -->|create_task| T2[Child B] P -->|create_task| T3[Child C] subgraph Rules R1[No child may outlive the parent] R2[First error cancels siblings] R3[Exceptions aggregate via ExceptionGroup] end

TaskGroup 101: start, await, propagate

# examples: taskgroup_basics.py
import asyncio
 
async def ok(delay: float) -> str:
    await asyncio.sleep(delay)
    return "ok"
 
async def boom(delay: float) -> str:
    await asyncio.sleep(delay)
    raise RuntimeError("child failed")
 
async def main() -> None:
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(ok(0.050))
            t2 = tg.create_task(boom(0.075))
            t3 = tg.create_task(ok(0.100))
        # We only reach here if all children finished successfully
        print(t1.result(), t3.result())
    except* RuntimeError as eg:  # PEP 654
        # Siblings are cancelled, and failures are grouped
        for e in eg.exceptions:
            print("caught:", e)
 
if __name__ == "__main__":
    asyncio.run(main())

Semantics to rely on:

  • If any child raises, the group cancels remaining children and exits by raising an ExceptionGroup.
  • You can handle specific error types with except* TypeError/except* RuntimeError while letting others bubble.
  • No child can leak past the context: scopes guarantee cleanup.

Ordered, bounded concurrency (map) with TaskGroup

Bounded fan‑out is common, and preserving input order in results is often required. Combine a semaphore with a TaskGroup and index results.

# examples: bounded_map.py
import asyncio
from typing import Callable, Iterable, TypeVar
 
T = TypeVar("T"); U = TypeVar("U")
 
async def bounded_map(
    func: Callable[[T], "asyncio.Future[U] | U"],
    items: Iterable[T],
    *,
    limit: int,
) -> list[U]:
    sem = asyncio.Semaphore(limit)
    items_list = list(items)
    results: list[U | None] = [None] * len(items_list)
 
    async def run_one(i: int, x: T) -> None:
        async with sem:
            r = await func(x)
            results[i] = r
 
    async with asyncio.TaskGroup() as tg:
        for i, x in enumerate(items_list):
            tg.create_task(run_one(i, x))
    # At this point, all tasks have completed or exceptions were raised
    # If any failed, the TaskGroup would have propagated
    return [r for r in results if r is not None]
 
# Demo stub
async def work(x: int) -> int:
    await asyncio.sleep(0.01)
    return x * x
 
async def main() -> None:
    out = await bounded_map(work, range(1000), limit=64)
    assert out[10] == 100
 
if __name__ == "__main__":
    asyncio.run(main())

Why not gather? It will happily spawn thousands of tasks at once. The TaskGroup + semaphore pattern enforces a hard cap and still propagates errors cleanly.

Failure handling with ExceptionGroup (granular)

When multiple children fail, you’ll get an ExceptionGroup. Use except* to triage by type, then re‑raise remaining failures.

# examples: exception_group.py
import asyncio
 
class RetryableError(RuntimeError): ...
class FatalError(RuntimeError): ...
 
async def maybe(i: int) -> int:
    await asyncio.sleep(0.01)
    if i % 7 == 0: raise RetryableError(i)
    if i % 11 == 0: raise FatalError(i)
    return i
 
async def main() -> None:
    try:
        async with asyncio.TaskGroup() as tg:
            for i in range(1, 50):
                tg.create_task(maybe(i))
    except* RetryableError as eg:
        print(f"retryable count: {len(eg.exceptions)}")
    except* FatalError as eg:
        # Treat as hard failure
        raise
 
if __name__ == "__main__":
    asyncio.run(main())

Small, shielded cleanup regions (only when needed)

Default: let cancellation interrupt work, and use try/finally to release resources. Occasionally, you must ensure cleanup cannot be cancelled (closing a socket, flushing a log record). Keep shields tiny.

# examples: shielded_cleanup.py
import asyncio
 
async def handler() -> None:
    writer: asyncio.StreamWriter | None = None
    try:
        reader, writer = await asyncio.open_connection("example.com", 80)
        writer.write(b"GET / HTTP/1.0\r\nHost: example.com\r\n\r\n")
        await writer.drain()
        await asyncio.wait_for(reader.read(1024), timeout=0.5)
    except Exception:
        # map/log
        pass
    finally:
        if writer is not None:
            # Shield only the minimal close handshake
            try:
                await asyncio.shield(writer.drain())
            except Exception:
                pass
            writer.close()
            try:
                await asyncio.shield(writer.wait_closed())
            except Exception:
                pass
 
if __name__ == "__main__":
    asyncio.run(handler())

Keep shields short and bounded; never wrap main work in shield().

Supervising sub‑scopes with budgets

Group related child work under its own scope and budget. If one fails or the budget expires, cancel the group.

# examples: scoped_budget.py
import asyncio
 
async def child(name: str) -> None:
    await asyncio.sleep(0.2)
    print("done", name)
 
async def parent() -> None:
    try:
        async with asyncio.timeout(0.15):  # total budget for the group
            async with asyncio.TaskGroup() as tg:
                tg.create_task(child("A"))
                tg.create_task(child("B"))
    except TimeoutError:
        # Group cancelled on budget expiry
        print("group timed out")
 
if __name__ == "__main__":
    asyncio.run(parent())
sequenceDiagram participant Parent participant Group participant A as Child A participant B as Child B Parent->>Group: open scope (deadline 150 ms) Group->>A: start Group->>B: start Note over Group: deadline expires Group-->>A: cancel Group-->>B: cancel Group-->>Parent: raise TimeoutError

Cancellation as a contract (prompt, routable, idempotent)

Cancellation is not “best effort.” Treat it as a hard contract: when the system asks work to stop, it must stop promptly, free resources, and leave the system in a consistent state.

Three principles:

  • Prompt: every long operation must have cancellation points (awaits or explicit checks)
  • Routable: cancellation flows along ownership (parent → children, scopes)
  • Idempotent: cleanup can run more than once without harm
sequenceDiagram participant Parent participant Group as TaskGroup participant A as Child A participant B as Child B Parent->>Group: cancel() Group-->>A: inject CancelledError at next await Group-->>B: inject CancelledError at next await A-->>Group: finally: release resources B-->>Group: finally: release resources Group-->>Parent: done

Make tasks cancellation‑friendly (cooperative)

Long tasks must reach an await regularly or check for cancellation explicitly. If you have CPU work, move it off the loop (to_thread or a pool) or slice it.

# examples: cancel_friendly.py
import asyncio
 
async def compute(n: int) -> int:
    # Slice CPU work to remain cancellable
    acc = 0
    for i in range(n):
        # periodic yield allows cancellation injection
        if i % 1000 == 0:
            await asyncio.sleep(0)
        acc += i
    return acc
 
async def worker() -> None:
    try:
        r = await compute(5_000_000)
        print("result", r)
    except asyncio.CancelledError:
        # release/rollback here
        raise  # never swallow
 
async def main() -> None:
    t = asyncio.create_task(worker())
    await asyncio.sleep(0.01)
    t.cancel()
    try:
        await t
    except asyncio.CancelledError:
        print("cancelled promptly")
 
if __name__ == "__main__":
    asyncio.run(main())

Rules of thumb:

  • Do not block the event loop with sync I/O. Use timeouts and async drivers, or asyncio.to_thread for legacy calls.
  • Always re‑raise CancelledError after local cleanup.
  • Avoid blanket except Exception: that swallows cancellations. If you must, add a fast path:
try:
    ...
except asyncio.CancelledError:
    raise
except Exception as e:
    ...  # handle real errors

Timeouts and propagation: timeout vs wait_for

Prefer asyncio.timeout() (3.11+) to scope a deadline. It cancels awaited operations and raises TimeoutError to the caller. If you need to preserve a background task across a timeout, shield it deliberately.

# examples: timeout_vs_shield.py
import asyncio
 
async def background() -> None:
    try:
        await asyncio.sleep(2)
    except asyncio.CancelledError:
        print("bg cancelled")
        raise
 
async def main() -> None:
    task = asyncio.create_task(background())
    try:
        async with asyncio.timeout(0.1):
            await asyncio.shield(task)   # keep background alive past timeout
    except TimeoutError:
        print("scope timed out; bg still running")
    await task  # now wait for it to finish (or cancel explicitly)
 
if __name__ == "__main__":
    asyncio.run(main())

Use shielding sparingly; it deliberately breaks propagation.

AnyIO/Trio ergonomics: cancellation scopes

If you’re on AnyIO/Trio, cancellation is explicit via scopes, and shielded regions are a named concept. The model maps cleanly to the ideas above.

# examples: anyio_scopes.py
import anyio
 
async def main() -> None:
    async with anyio.create_task_group() as tg:
        with anyio.move_on_after(0.1) as scope:  # deadline for this block
            tg.start_soon(anyio.sleep, 1.0)
        if scope.cancel_called:
            print("deadline hit; children cancelled")
 
if __name__ == "__main__":
    anyio.run(main)

Fast, graceful shutdown

Graceful shutdown is a cancellation scenario with external triggers (SIGTERM, K8s preStop). Keep it bounded and observable.

# examples: graceful_shutdown.py
import asyncio, signal
 
async def serve() -> None:
    # pretend this is your server loop
    while True:
        await asyncio.sleep(1)
 
async def main() -> None:
    loop = asyncio.get_running_loop()
    stop = asyncio.Event()
    loop.add_signal_handler(signal.SIGTERM, stop.set)
    loop.add_signal_handler(signal.SIGINT, stop.set)
 
    async with asyncio.TaskGroup() as tg:
        server = tg.create_task(serve())
        watcher = tg.create_task(stop.wait())
    # We get here when either serve() finished or stop was signalled
    # Cancel remaining work and wait a short drain deadline
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    for t in tasks: t.cancel()
    try:
        async with asyncio.timeout(2.0):
            await asyncio.gather(*tasks, return_exceptions=True)
    except TimeoutError:
        pass
 
if __name__ == "__main__":
    asyncio.run(main())

Guidance:

  • Install signal handlers once at process start. On Windows, use alternative triggers.
  • Put a small drain deadline on shutdown; don’t hang forever.
  • Ensure background loops observe cancellation quickly (awaits inside).

Proving cancellation is prompt

Bake "cancel‑to‑done" SLOs into tests. Measure the time from cancel() to completion and assert bounds.

# examples: test_cancel_latency.py
import asyncio, time, pytest
 
async def sleeper():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        await asyncio.sleep(0)  # minimal cleanup
        raise
 
@pytest.mark.asyncio
async def test_cancel_latency_under_50ms():
    t = asyncio.create_task(sleeper())
    await asyncio.sleep(0.01)
    start = time.perf_counter()
    t.cancel()
    with pytest.raises(asyncio.CancelledError):
        await t
    assert (time.perf_counter() - start) < 0.050

Common footguns (and fixes)

  • Blanket except Exception swallows CancelledError in asyncio. Add an explicit except asyncio.CancelledError: raise first.
  • Long CPU loops block cancellation. Slice work, insert short awaits, or push to threads/processes.
  • Blocking libraries in async code. Prefer async clients or to_thread, and keep a timeout.
  • asyncio.gather without return_exceptions=True in teardown can mask cancellations. Prefer TaskGroup or use gather(..., return_exceptions=True) only in shutdown paths you’ll inspect.
  • Async generators: ensure aclose() is awaited (use async with helpers) to avoid resource leaks on cancellation.

Putting it together: a small, honest service skeleton

This sketch combines admission control, structured concurrency, and deadlines. Swap the fake I/O for real handlers.

# examples: service_skeleton.py
import asyncio
from typing import Any
 
MAX_INFLIGHT = 256
REQ_DEADLINE_S = 0.250
 
class Service:
    def __init__(self) -> None:
        self._sema = asyncio.Semaphore(MAX_INFLIGHT)
 
    async def handle(self, req: dict[str, Any]) -> dict[str, Any]:
        async with self._sema:
            try:
                async with asyncio.timeout(REQ_DEADLINE_S):
                    async with asyncio.TaskGroup() as tg:
                        t1 = tg.create_task(self._call_backend("A", req))
                        t2 = tg.create_task(self._call_backend("B", req))
                    return {"a": t1.result(), "b": t2.result()}
            except TimeoutError:
                return {"error": "deadline"}
 
    async def _call_backend(self, name: str, req: dict[str, Any]) -> str:
        await asyncio.sleep(0.015)
        return f"ok-{name}"
 
async def main() -> None:
    svc = Service()
    # Simulate many requests
    async with asyncio.TaskGroup() as tg:
        for i in range(1000):
            tg.create_task(svc.handle({"id": i}))
 
if __name__ == "__main__":
    asyncio.run(main())

Guidance:

  • Cap in‑flight work at the top; prefer to shed/503 fast under overload.
  • Carry a single request deadline and derive sub‑budgets if you fan out.
  • Keep child work inside a group for predictable cleanup and propagation.

Fault injection and load realism

Validate behavior under latency, drops, and bandwidth caps. A simple harness:

# Start toxiproxy and simulate a slow/downstream service on localhost:9000
docker run --rm -p 8474:8474 -p 9000:9000 shopify/toxiproxy
 
# Create a proxy and add latency + timeouty behavior
curl -sX POST localhost:8474/proxies \
  -d '{"name":"slow","listen":"0.0.0.0:9000","upstream":"example.com:80"}' | cat
curl -sX POST localhost:8474/proxies/slow/toxics \
  -d '{"name":"latency","type":"latency","stream":"downstream","attributes":{"latency":120,"jitter":40}}' | cat

Point your client at localhost:9000, watch deadlines fire, error budgets trip, and ensure queue/semaphore gauges stay bounded.

Observability and SLOs

  • Export: queue sizes, semaphore permits in use, in‑flight per endpoint, per‑status counts, cancel latencies, deadline breaches.
  • Trace: propagate deadlines via context and add attributes (budget_ms, shed=true/false).
  • SLOs: p95 latency and error budgets per endpoint; alert on sustained budget burn.

uvloop note

uvloop can reduce overhead for network‑heavy services. Verify correctness and measure tail latencies before adopting.

# examples: uvloop_enable.py
import asyncio
import uvloop
 
def main() -> None:
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    asyncio.run(asyncio.sleep(0))
 
if __name__ == "__main__":
    main()

Production checklist

  • Bound every queue and cap in‑flight work with semaphores.
  • Put deadlines on external awaits; cancel on breach; return idempotent failures.
  • Use TaskGroup for related child work; handle failures with except*.
  • Keep shielded regions tiny and only for cleanup/handshakes.
  • Prove cancel‑to‑done latency in tests; add shutdown drains with small timeouts.
  • Instrument backpressure and cancellations; alert on sustained pressure.
  • Load test with latency and drops (toxiproxy); check p95 and memory flatness.

References