Most async outages don’t look like crashes—they look like slow motion. Queues grow silently, memory creeps up, latencies stretch, and only then do errors appear. The fix isn’t a bigger box; it’s discipline: explicit backpressure, structured concurrency, and cancellation that’s part of the contract.
This post is a production‑first field guide. We’ll build a shared mental model, lay down primitives that keep systems honest, and show runnable patterns you can lift into services. We’ll start with backpressure and deadlines you can trust.
The shape of overload (and how to stop it)
Backpressure is the system’s ability to tell producers “not so fast” and make that stick. In asyncio
, you’ll usually enforce it at three layers:
- Bounded admission at the edge (queues, semaphores, rate limits)
- Flow control at transports/streams (
drain()
pauses writers) - Deadlines on work (budgeted waits, not forever waits)
Key rule: if you cannot bound the queue, you have not implemented backpressure—you’ve only moved the problem.
A minimal, bounded producer–consumer that won’t melt
The queue is your pressure gauge. Set a size that reflects real memory limits, add timeouts so producers don’t wait forever, and decide your overload behavior (shed, degrade, or back off).
# examples: bounded_queue.py
import asyncio
from collections.abc import Iterable
async def producer(name: str, q: asyncio.Queue[str], items: Iterable[str]) -> None:
for item in items:
try:
# Apply backpressure at admission with a deadline
await asyncio.wait_for(q.put(item), timeout=0.250)
print(f"{name}: queued {item} (qsize={q.qsize()})")
except asyncio.TimeoutError:
# Overload policy: shed or log + retry with jitter
print(f"{name}: queue full, shedding {item}")
await asyncio.sleep(0) # yield; keep system cancelable
async def worker(name: str, q: asyncio.Queue[str]) -> None:
while True:
item = await q.get() # cancellation-safe wait point
try:
await asyncio.sleep(0.050) # simulate I/O work
print(f"{name}: processed {item}")
finally:
q.task_done()
async def main() -> None:
q: asyncio.Queue[str] = asyncio.Queue(maxsize=64) # bounded!
# Start a few consumers
workers = [asyncio.create_task(worker(f"W{i}", q)) for i in range(4)]
# Bursty producers
items = [f"item-{i}" for i in range(500)]
await asyncio.gather(
producer("P1", q, items[0::2]),
producer("P2", q, items[1::2]),
)
# Wait for all work to drain, then cancel workers
await q.join()
for t in workers:
t.cancel()
await asyncio.gather(*workers, return_exceptions=True)
if __name__ == "__main__":
asyncio.run(main())
Guidance:
- Set
maxsize
based on a memory budget you’re willing to spend under stress. - Carry deadlines on
put()
/get()
—don’t suspend producers forever. - Decide and document overload behavior (drop, backoff, or degrade).
Admission control with semaphores (cheap and effective)
When there isn’t a natural queue boundary (e.g., fan‑out RPCs), use a semaphore to cap in‑flight work. This is backpressure at the call‑site.
# examples: semaphore_limit.py
import asyncio
semaphore = asyncio.Semaphore(200) # global cap on concurrency
async def fetch(url: str) -> bytes:
await asyncio.sleep(0.010) # stand‑in for real I/O
return ("ok:" + url).encode()
async def process_one(url: str) -> None:
async with semaphore: # admission gate
data = await fetch(url)
# ... downstream work
async def process_all(urls: list[str]) -> None:
async with asyncio.TaskGroup() as tg: # Python 3.11+
for u in urls:
tg.create_task(process_one(u))
if __name__ == "__main__":
asyncio.run(process_all([f"https://ex/{i}" for i in range(10_000)]))
Notes:
- Prefer a single shared
Semaphore
per bottlenecked resource (e.g., remote API, DB pool). Size it from measurements, not guesses. - Pair with short deadlines to avoid piling up stuck work (see next section).
Stream flow control you should actually use (drain()
)
asyncio
transports/streams implement write‑side flow control. When buffers grow past a high‑water mark, writers must await drain()
; that await is the backpressure signal.
# examples: stream_backpressure.py
import asyncio
async def send_many(writer: asyncio.StreamWriter, chunks: list[bytes]) -> None:
for chunk in chunks:
writer.write(chunk)
# This await cooperates with the transport's high‑water marks
await writer.drain()
async def main() -> None:
reader, writer = await asyncio.open_connection("example.com", 80)
try:
await send_many(writer, [b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"])
await reader.read(1024)
finally:
writer.close(); await writer.wait_closed()
if __name__ == "__main__":
asyncio.run(main())
If you skip drain()
, you disable a core safety valve and can inflate memory under bursts.
Deadlines over timeouts: budget every await
Time‑bounded systems carry a deadline from ingress to leaf calls and derive budgets per step. In asyncio
3.11+, use asyncio.timeout()
; in earlier versions, use asyncio.wait_for
.
# examples: deadlines.py
import asyncio
async def call_downstream() -> None:
await asyncio.sleep(0.150) # stand‑in for real I/O
async def handler() -> None:
# Global request budget: 120 ms
try:
async with asyncio.timeout(0.120): # Python 3.11+
await call_downstream()
# pass a reduced budget to children if needed
except TimeoutError:
# Map to a 504/overload response, release resources
return
if __name__ == "__main__":
asyncio.run(handler())
Budget tips:
- Put a number on every external await. No silent, unbounded waits.
- On breach, cancel promptly and return a clear, idempotent failure.
- Derive sub‑budgets rather than reusing the full deadline at every layer.
Operational guardrails you can enable today
- Export gauges: queue lengths, semaphore permits in use, stream/backlog metrics.
- Alert on sustained admission failures, deadline breaches, and retries.
- Prefer shedding early to building giant buffers—fail fast beats failing late.
Structured concurrency: make lifetimes explicit
Unstructured concurrency spawns tasks that can outlive their parents. That’s how you get leaks, stuck shutdowns, and ghost work. Structured concurrency gives every task a parent scope that is responsible for starting it, awaiting it, and cleaning it up.
In Python 3.11+, asyncio.TaskGroup
brings this model to the standard library.
TaskGroup 101: start, await, propagate
# examples: taskgroup_basics.py
import asyncio
async def ok(delay: float) -> str:
await asyncio.sleep(delay)
return "ok"
async def boom(delay: float) -> str:
await asyncio.sleep(delay)
raise RuntimeError("child failed")
async def main() -> None:
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(ok(0.050))
t2 = tg.create_task(boom(0.075))
t3 = tg.create_task(ok(0.100))
# We only reach here if all children finished successfully
print(t1.result(), t3.result())
except* RuntimeError as eg: # PEP 654
# Siblings are cancelled, and failures are grouped
for e in eg.exceptions:
print("caught:", e)
if __name__ == "__main__":
asyncio.run(main())
Semantics to rely on:
- If any child raises, the group cancels remaining children and exits by raising an
ExceptionGroup
. - You can handle specific error types with
except* TypeError
/except* RuntimeError
while letting others bubble. - No child can leak past the context: scopes guarantee cleanup.
Ordered, bounded concurrency (map) with TaskGroup
Bounded fan‑out is common, and preserving input order in results is often required. Combine a semaphore with a TaskGroup and index results.
# examples: bounded_map.py
import asyncio
from typing import Callable, Iterable, TypeVar
T = TypeVar("T"); U = TypeVar("U")
async def bounded_map(
func: Callable[[T], "asyncio.Future[U] | U"],
items: Iterable[T],
*,
limit: int,
) -> list[U]:
sem = asyncio.Semaphore(limit)
items_list = list(items)
results: list[U | None] = [None] * len(items_list)
async def run_one(i: int, x: T) -> None:
async with sem:
r = await func(x)
results[i] = r
async with asyncio.TaskGroup() as tg:
for i, x in enumerate(items_list):
tg.create_task(run_one(i, x))
# At this point, all tasks have completed or exceptions were raised
# If any failed, the TaskGroup would have propagated
return [r for r in results if r is not None]
# Demo stub
async def work(x: int) -> int:
await asyncio.sleep(0.01)
return x * x
async def main() -> None:
out = await bounded_map(work, range(1000), limit=64)
assert out[10] == 100
if __name__ == "__main__":
asyncio.run(main())
Why not gather
? It will happily spawn thousands of tasks at once. The TaskGroup + semaphore pattern enforces a hard cap and still propagates errors cleanly.
Failure handling with ExceptionGroup (granular)
When multiple children fail, you’ll get an ExceptionGroup
. Use except*
to triage by type, then re‑raise remaining failures.
# examples: exception_group.py
import asyncio
class RetryableError(RuntimeError): ...
class FatalError(RuntimeError): ...
async def maybe(i: int) -> int:
await asyncio.sleep(0.01)
if i % 7 == 0: raise RetryableError(i)
if i % 11 == 0: raise FatalError(i)
return i
async def main() -> None:
try:
async with asyncio.TaskGroup() as tg:
for i in range(1, 50):
tg.create_task(maybe(i))
except* RetryableError as eg:
print(f"retryable count: {len(eg.exceptions)}")
except* FatalError as eg:
# Treat as hard failure
raise
if __name__ == "__main__":
asyncio.run(main())
Small, shielded cleanup regions (only when needed)
Default: let cancellation interrupt work, and use try/finally
to release resources. Occasionally, you must ensure cleanup cannot be cancelled (closing a socket, flushing a log record). Keep shields tiny.
# examples: shielded_cleanup.py
import asyncio
async def handler() -> None:
writer: asyncio.StreamWriter | None = None
try:
reader, writer = await asyncio.open_connection("example.com", 80)
writer.write(b"GET / HTTP/1.0\r\nHost: example.com\r\n\r\n")
await writer.drain()
await asyncio.wait_for(reader.read(1024), timeout=0.5)
except Exception:
# map/log
pass
finally:
if writer is not None:
# Shield only the minimal close handshake
try:
await asyncio.shield(writer.drain())
except Exception:
pass
writer.close()
try:
await asyncio.shield(writer.wait_closed())
except Exception:
pass
if __name__ == "__main__":
asyncio.run(handler())
Keep shields short and bounded; never wrap main work in shield()
.
Supervising sub‑scopes with budgets
Group related child work under its own scope and budget. If one fails or the budget expires, cancel the group.
# examples: scoped_budget.py
import asyncio
async def child(name: str) -> None:
await asyncio.sleep(0.2)
print("done", name)
async def parent() -> None:
try:
async with asyncio.timeout(0.15): # total budget for the group
async with asyncio.TaskGroup() as tg:
tg.create_task(child("A"))
tg.create_task(child("B"))
except TimeoutError:
# Group cancelled on budget expiry
print("group timed out")
if __name__ == "__main__":
asyncio.run(parent())
Cancellation as a contract (prompt, routable, idempotent)
Cancellation is not “best effort.” Treat it as a hard contract: when the system asks work to stop, it must stop promptly, free resources, and leave the system in a consistent state.
Three principles:
- Prompt: every long operation must have cancellation points (awaits or explicit checks)
- Routable: cancellation flows along ownership (parent → children, scopes)
- Idempotent: cleanup can run more than once without harm
Make tasks cancellation‑friendly (cooperative)
Long tasks must reach an await
regularly or check for cancellation explicitly. If you have CPU work, move it off the loop (to_thread
or a pool) or slice it.
# examples: cancel_friendly.py
import asyncio
async def compute(n: int) -> int:
# Slice CPU work to remain cancellable
acc = 0
for i in range(n):
# periodic yield allows cancellation injection
if i % 1000 == 0:
await asyncio.sleep(0)
acc += i
return acc
async def worker() -> None:
try:
r = await compute(5_000_000)
print("result", r)
except asyncio.CancelledError:
# release/rollback here
raise # never swallow
async def main() -> None:
t = asyncio.create_task(worker())
await asyncio.sleep(0.01)
t.cancel()
try:
await t
except asyncio.CancelledError:
print("cancelled promptly")
if __name__ == "__main__":
asyncio.run(main())
Rules of thumb:
- Do not block the event loop with sync I/O. Use timeouts and async drivers, or
asyncio.to_thread
for legacy calls. - Always re‑raise
CancelledError
after local cleanup. - Avoid blanket
except Exception:
that swallows cancellations. If you must, add a fast path:
try:
...
except asyncio.CancelledError:
raise
except Exception as e:
... # handle real errors
Timeouts and propagation: timeout
vs wait_for
Prefer asyncio.timeout()
(3.11+) to scope a deadline. It cancels awaited operations and raises TimeoutError
to the caller. If you need to preserve a background task across a timeout, shield it deliberately.
# examples: timeout_vs_shield.py
import asyncio
async def background() -> None:
try:
await asyncio.sleep(2)
except asyncio.CancelledError:
print("bg cancelled")
raise
async def main() -> None:
task = asyncio.create_task(background())
try:
async with asyncio.timeout(0.1):
await asyncio.shield(task) # keep background alive past timeout
except TimeoutError:
print("scope timed out; bg still running")
await task # now wait for it to finish (or cancel explicitly)
if __name__ == "__main__":
asyncio.run(main())
Use shielding sparingly; it deliberately breaks propagation.
AnyIO/Trio ergonomics: cancellation scopes
If you’re on AnyIO/Trio, cancellation is explicit via scopes, and shielded regions are a named concept. The model maps cleanly to the ideas above.
# examples: anyio_scopes.py
import anyio
async def main() -> None:
async with anyio.create_task_group() as tg:
with anyio.move_on_after(0.1) as scope: # deadline for this block
tg.start_soon(anyio.sleep, 1.0)
if scope.cancel_called:
print("deadline hit; children cancelled")
if __name__ == "__main__":
anyio.run(main)
Fast, graceful shutdown
Graceful shutdown is a cancellation scenario with external triggers (SIGTERM, K8s preStop). Keep it bounded and observable.
# examples: graceful_shutdown.py
import asyncio, signal
async def serve() -> None:
# pretend this is your server loop
while True:
await asyncio.sleep(1)
async def main() -> None:
loop = asyncio.get_running_loop()
stop = asyncio.Event()
loop.add_signal_handler(signal.SIGTERM, stop.set)
loop.add_signal_handler(signal.SIGINT, stop.set)
async with asyncio.TaskGroup() as tg:
server = tg.create_task(serve())
watcher = tg.create_task(stop.wait())
# We get here when either serve() finished or stop was signalled
# Cancel remaining work and wait a short drain deadline
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for t in tasks: t.cancel()
try:
async with asyncio.timeout(2.0):
await asyncio.gather(*tasks, return_exceptions=True)
except TimeoutError:
pass
if __name__ == "__main__":
asyncio.run(main())
Guidance:
- Install signal handlers once at process start. On Windows, use alternative triggers.
- Put a small drain deadline on shutdown; don’t hang forever.
- Ensure background loops observe cancellation quickly (awaits inside).
Proving cancellation is prompt
Bake "cancel‑to‑done" SLOs into tests. Measure the time from cancel()
to completion and assert bounds.
# examples: test_cancel_latency.py
import asyncio, time, pytest
async def sleeper():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
await asyncio.sleep(0) # minimal cleanup
raise
@pytest.mark.asyncio
async def test_cancel_latency_under_50ms():
t = asyncio.create_task(sleeper())
await asyncio.sleep(0.01)
start = time.perf_counter()
t.cancel()
with pytest.raises(asyncio.CancelledError):
await t
assert (time.perf_counter() - start) < 0.050
Common footguns (and fixes)
- Blanket
except Exception
swallowsCancelledError
inasyncio
. Add an explicitexcept asyncio.CancelledError: raise
first. - Long CPU loops block cancellation. Slice work, insert short awaits, or push to threads/processes.
- Blocking libraries in async code. Prefer async clients or
to_thread
, and keep a timeout. asyncio.gather
withoutreturn_exceptions=True
in teardown can mask cancellations. PreferTaskGroup
or usegather(..., return_exceptions=True)
only in shutdown paths you’ll inspect.- Async generators: ensure
aclose()
is awaited (useasync with
helpers) to avoid resource leaks on cancellation.
Putting it together: a small, honest service skeleton
This sketch combines admission control, structured concurrency, and deadlines. Swap the fake I/O for real handlers.
# examples: service_skeleton.py
import asyncio
from typing import Any
MAX_INFLIGHT = 256
REQ_DEADLINE_S = 0.250
class Service:
def __init__(self) -> None:
self._sema = asyncio.Semaphore(MAX_INFLIGHT)
async def handle(self, req: dict[str, Any]) -> dict[str, Any]:
async with self._sema:
try:
async with asyncio.timeout(REQ_DEADLINE_S):
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(self._call_backend("A", req))
t2 = tg.create_task(self._call_backend("B", req))
return {"a": t1.result(), "b": t2.result()}
except TimeoutError:
return {"error": "deadline"}
async def _call_backend(self, name: str, req: dict[str, Any]) -> str:
await asyncio.sleep(0.015)
return f"ok-{name}"
async def main() -> None:
svc = Service()
# Simulate many requests
async with asyncio.TaskGroup() as tg:
for i in range(1000):
tg.create_task(svc.handle({"id": i}))
if __name__ == "__main__":
asyncio.run(main())
Guidance:
- Cap in‑flight work at the top; prefer to shed/503 fast under overload.
- Carry a single request deadline and derive sub‑budgets if you fan out.
- Keep child work inside a group for predictable cleanup and propagation.
Fault injection and load realism
Validate behavior under latency, drops, and bandwidth caps. A simple harness:
# Start toxiproxy and simulate a slow/downstream service on localhost:9000
docker run --rm -p 8474:8474 -p 9000:9000 shopify/toxiproxy
# Create a proxy and add latency + timeouty behavior
curl -sX POST localhost:8474/proxies \
-d '{"name":"slow","listen":"0.0.0.0:9000","upstream":"example.com:80"}' | cat
curl -sX POST localhost:8474/proxies/slow/toxics \
-d '{"name":"latency","type":"latency","stream":"downstream","attributes":{"latency":120,"jitter":40}}' | cat
Point your client at localhost:9000
, watch deadlines fire, error budgets trip, and ensure queue/semaphore gauges stay bounded.
Observability and SLOs
- Export: queue sizes, semaphore permits in use, in‑flight per endpoint, per‑status counts, cancel latencies, deadline breaches.
- Trace: propagate deadlines via context and add attributes (budget_ms, shed=true/false).
- SLOs: p95 latency and error budgets per endpoint; alert on sustained budget burn.
uvloop note
uvloop
can reduce overhead for network‑heavy services. Verify correctness and measure tail latencies before adopting.
# examples: uvloop_enable.py
import asyncio
import uvloop
def main() -> None:
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(asyncio.sleep(0))
if __name__ == "__main__":
main()
Production checklist
- Bound every queue and cap in‑flight work with semaphores.
- Put deadlines on external awaits; cancel on breach; return idempotent failures.
- Use
TaskGroup
for related child work; handle failures withexcept*
. - Keep shielded regions tiny and only for cleanup/handshakes.
- Prove cancel‑to‑done latency in tests; add shutdown drains with small timeouts.
- Instrument backpressure and cancellations; alert on sustained pressure.
- Load test with latency and drops (toxiproxy); check p95 and memory flatness.
References
- Python docs: asyncio Task Groups — https://docs.python.org/3/library/asyncio-task.html#task-groups
- PEP 654: Exception Groups and except* — https://peps.python.org/pep-0654/
- Python docs: ExceptionGroup — https://docs.python.org/3/library/exceptions.html#ExceptionGroup
- Python docs: asyncio Queue — https://docs.python.org/3/library/asyncio-queue.html
- Python docs: StreamWriter.drain (flow control) — https://docs.python.org/3/library/asyncio-stream.html#asyncio.StreamWriter.drain
- Python docs: Protocol flow control calls — https://docs.python.org/3/library/asyncio-protocol.html#flow-control-calls
- Python docs: asyncio.wait_for — https://docs.python.org/3/library/asyncio-task.html#asyncio.wait_for
- Python docs: asyncio.timeout (3.11+) — https://docs.python.org/3/library/asyncio-task.html#asyncio.timeout
- Python docs: asyncio.shield — https://docs.python.org/3/library/asyncio-task.html#asyncio.shield
- Python docs: Signals and event loop — https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.add_signal_handler
- PEP 3156: Asynchronous IO Support Rebooted — https://peps.python.org/pep-3156/
- Trio docs: nurseries (structured concurrency) — https://trio.readthedocs.io/en/stable/reference-core.html#nursery
- AnyIO docs: task groups — https://anyio.readthedocs.io/en/stable/task-groups.html
- AnyIO docs: cancellation scopes — https://anyio.readthedocs.io/en/stable/cancellation.html
- N. J. Smith: Structured concurrency (Go statement harmful) — https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
- M. Sústrik: Structured Concurrency — https://250bpm.com/blog:71
- M. Sústrik: Go Statement Considered Harmful — https://250bpm.com/blog:154
- uvloop (event loop policy) — https://github.com/MagicStack/uvloop
- Toxiproxy (fault injection) — https://github.com/Shopify/toxiproxy
- HTTPX: Timeouts — https://www.python-httpx.org/advanced/#timeouts
- pytest-asyncio — https://pytest-asyncio.readthedocs.io/en/latest/