Concurrency
Futures
Backpressure
Task Management
Asynchronous Programming

Controlling the number of spawned futures to create backpressure

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Introduction

When spawning async tasks or futures, an unbounded number of concurrent operations can exhaust memory, file descriptors, or network connections. Backpressure is the mechanism that limits how many futures run concurrently, forcing producers to slow down when consumers cannot keep up. Most async runtimes provide semaphores, buffered streams, or bounded channels to control concurrency.

The Problem: Unbounded Concurrency

rust
1// Rust — spawning thousands of futures without limits
2let urls: Vec<String> = get_urls(); // 10,000 URLs
3
4let handles: Vec<_> = urls.into_iter().map(|url| {
5    tokio::spawn(async move {
6        reqwest::get(&url).await // All 10K fire at once
7    })
8}).collect();
9
10// Result: connection pool exhausted, OS runs out of file descriptors
python
1# Python — same problem with asyncio
2import asyncio, aiohttp
3
4async def fetch_all(urls):
5    async with aiohttp.ClientSession() as session:
6        tasks = [session.get(url) for url in urls]  # 10K concurrent requests
7        return await asyncio.gather(*tasks)

Solution 1: Semaphore (Most Languages)

A semaphore limits the number of concurrent tasks by requiring each task to acquire a permit before running.

Python (asyncio)

python
1import asyncio
2import aiohttp
3
4async def fetch(session, url, semaphore):
5    async with semaphore:
6        async with session.get(url) as response:
7            return await response.text()
8
9async def main():
10    semaphore = asyncio.Semaphore(20)  # Max 20 concurrent requests
11    async with aiohttp.ClientSession() as session:
12        tasks = [fetch(session, url, semaphore) for url in urls]
13        results = await asyncio.gather(*tasks)

Rust (Tokio)

rust
1use tokio::sync::Semaphore;
2use std::sync::Arc;
3
4let semaphore = Arc::new(Semaphore::new(20));
5let mut handles = Vec::new();
6
7for url in urls {
8    let permit = semaphore.clone().acquire_owned().await.unwrap();
9    handles.push(tokio::spawn(async move {
10        let result = reqwest::get(&url).await;
11        drop(permit); // Release when done
12        result
13    }));
14}

JavaScript (Manual Semaphore)

javascript
1async function fetchWithLimit(urls, limit) {
2  const results = [];
3  const executing = new Set();
4
5  for (const url of urls) {
6    const promise = fetch(url).then(r => r.json());
7    results.push(promise);
8    executing.add(promise);
9    promise.finally(() => executing.delete(promise));
10
11    if (executing.size >= limit) {
12      await Promise.race(executing);
13    }
14  }
15  return Promise.all(results);
16}
17
18await fetchWithLimit(urls, 20);

Solution 2: Buffered Streams (Rust)

The futures crate provides buffer_unordered to process a stream of futures with bounded concurrency:

rust
1use futures::stream::{self, StreamExt};
2
3let results: Vec<_> = stream::iter(urls)
4    .map(|url| async move {
5        reqwest::get(&url).await
6    })
7    .buffer_unordered(20)  // Max 20 in-flight futures
8    .collect()
9    .await;

buffer_unordered returns results as they complete (out of order). Use buffered to preserve input order.

Solution 3: Bounded Channels

Producer-consumer patterns with a bounded channel naturally apply backpressure — the producer blocks when the channel is full.

python
1import asyncio
2
3async def producer(queue, urls):
4    for url in urls:
5        await queue.put(url)  # Blocks when queue is full
6    for _ in range(NUM_WORKERS):
7        await queue.put(None)  # Sentinel to stop workers
8
9async def worker(queue, session):
10    while True:
11        url = await queue.get()
12        if url is None:
13            break
14        async with session.get(url) as resp:
15            await resp.text()
16        queue.task_done()
17
18async def main():
19    queue = asyncio.Queue(maxsize=20)  # Backpressure threshold
20    async with aiohttp.ClientSession() as session:
21        workers = [asyncio.create_task(worker(queue, session)) for _ in range(10)]
22        await producer(queue, urls)
23        await asyncio.gather(*workers)

Solution 4: Thread/Task Pool Executors

python
1from concurrent.futures import ThreadPoolExecutor, as_completed
2import requests
3
4with ThreadPoolExecutor(max_workers=20) as executor:
5    future_to_url = {executor.submit(requests.get, url): url for url in urls}
6    for future in as_completed(future_to_url):
7        response = future.result()

The pool's max_workers parameter limits concurrency. Excess submissions queue until a worker is free.

Choosing the Right Concurrency Limit

The optimal limit depends on the bottleneck:

BottleneckTypical LimitHow to Determine
Network I/O (HTTP)20-100Server rate limits, connection pool size
Database connectionsPool size (e.g., 10-50)Match connection pool max
CPU-bound workNumber of coresos.cpu_count() or num_cpus::get()
File descriptorsulimit -n minus headroomCheck OS limits
External APIAPI rate limitRead API docs

Common Pitfalls

  • Creating all futures before limiting: asyncio.gather(*[fetch(url) for url in urls]) creates all coroutine objects immediately. The semaphore must be inside the coroutine, not outside. Otherwise all tasks are spawned first and the semaphore only gates execution.
  • Deadlocking with nested semaphore acquisition: If a task acquires a semaphore permit and then calls a function that also acquires from the same semaphore, it deadlocks when the pool is full. Use separate semaphores for separate resource types.
  • Not handling errors inside limited tasks: When one task in a buffer_unordered stream panics or errors, it can stall the entire pipeline. Always handle errors inside each future and return Result types instead of unwrapping.
  • Setting limits too low: An overly conservative limit (e.g., 1-2 concurrent requests) serializes work and wastes throughput. Benchmark with increasing limits to find the sweet spot where throughput plateaus.
  • Ignoring downstream backpressure: Limiting request concurrency but buffering all responses in memory defeats the purpose. If results are large, process them incrementally instead of collecting into a Vec or list.

Summary

  • Unbounded future spawning exhausts system resources — always limit concurrency
  • Use asyncio.Semaphore (Python), tokio::sync::Semaphore (Rust), or manual promise pools (JavaScript)
  • buffer_unordered in Rust's futures crate is the idiomatic stream-based approach
  • Bounded channels (asyncio.Queue, tokio::sync::mpsc) provide natural backpressure
  • Thread pool executors (ThreadPoolExecutor, Rayon) limit concurrency via pool size
  • Match the concurrency limit to the actual bottleneck (network, DB, CPU, OS limits)

Course illustration
Course illustration

All Rights Reserved.