Async
Concurrent Queue
Max Concurrency
Task Scheduling
Asynchronous Programming

Async Concurrent Queue with max concurrency

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Introduction

An async concurrent queue with a maximum concurrency limit is a queue that accepts many jobs but runs only a fixed number at the same time. This pattern is useful when you want parallelism for I/O-heavy work without flooding an API, database, or remote service.

The Core Pattern

There are two responsibilities:

  1. hold pending work in a queue
  2. ensure only N jobs run concurrently

In Python asyncio, the cleanest solution usually combines:

  • 'asyncio.Queue for pending jobs'
  • a fixed number of worker tasks, or a semaphore, to enforce the concurrency cap

The worker-pool design is easy to reason about because you create exactly as many workers as the allowed concurrency.

A Runnable asyncio Implementation

python
1import asyncio
2import random
3
4
5async def process_job(job_id):
6    delay = random.uniform(0.2, 0.8)
7    await asyncio.sleep(delay)
8    print(f"finished job {job_id} in {delay:.2f}s")
9
10
11async def worker(name, queue):
12    while True:
13        job = await queue.get()
14        if job is None:
15            queue.task_done()
16            return
17
18        try:
19            await process_job(job)
20        finally:
21            queue.task_done()
22
23
24async def main():
25    queue = asyncio.Queue()
26    max_concurrency = 3
27
28    workers = [
29        asyncio.create_task(worker(f"worker-{i}", queue))
30        for i in range(max_concurrency)
31    ]
32
33    for job_id in range(10):
34        await queue.put(job_id)
35
36    await queue.join()
37
38    for _ in workers:
39        await queue.put(None)
40
41    await asyncio.gather(*workers)
42
43
44asyncio.run(main())

Only three workers are created, so only three jobs can be active at once even though ten jobs are queued.

Why This Is Better Than Launching Everything

Without a concurrency limit, it is tempting to create one task per job and await gather. That works for tiny workloads, but it scales poorly when each job opens connections or consumes memory.

A bounded queue improves stability because:

  • backpressure is explicit
  • peak resource usage stays predictable
  • rate-limited systems are less likely to reject requests

This is especially important for web scraping, API clients, message processing, and batch jobs with thousands of network operations.

Using a Semaphore Instead

Another valid pattern is to create many tasks but protect the critical section with a semaphore:

python
1import asyncio
2
3semaphore = asyncio.Semaphore(3)
4
5
6async def bounded_job(job_id):
7    async with semaphore:
8        await asyncio.sleep(0.5)
9        print(f"done {job_id}")

This is useful when tasks are created elsewhere and you only need to cap a specific resource-consuming region. The queue-based worker model is usually cleaner when you are building a reusable scheduler.

Design Choices That Matter

Think about how the queue should behave under load:

  • should producers block when the queue gets too large
  • should failed jobs be retried
  • should results be returned in completion order or submission order

Those are application-level decisions, but the concurrency-limited queue is usually the foundation underneath them.

If you want backpressure, give the queue a maximum size:

python
queue = asyncio.Queue(maxsize=100)

That prevents producers from overwhelming memory with unlimited pending work.

Common Pitfalls

  • Launching unlimited tasks and assuming async automatically manages resource usage.
  • Forgetting queue.task_done(), which can make queue.join() hang forever.
  • Not sending shutdown sentinels to workers, leaving them blocked on queue.get().
  • Using high concurrency for CPU-bound work, which usually needs multiprocessing instead.
  • Ignoring backpressure when producers are much faster than consumers.

Summary

  • An async concurrent queue runs many jobs overall but only a fixed number at once.
  • In asyncio, Queue plus a fixed worker pool is a simple and reliable pattern.
  • A semaphore is another good option when only a specific section needs concurrency control.
  • Bounded concurrency protects APIs, databases, and system memory.
  • The pattern becomes much more useful when combined with backpressure, retries, and clear shutdown behavior.

Course illustration
Course illustration

All Rights Reserved.