Python
AsyncIO
Subprocess
Stdout
Concurrency

How to get subprocess' stdout data asynchronously?

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Introduction

Reading subprocess stdout asynchronously is important when a child process produces streaming output and your application must stay responsive. Blocking calls like subprocess.run(..., capture_output=True) are fine for short commands but not for long-running tools, interactive pipelines, or live logs. In modern Python, asyncio.create_subprocess_exec is the standard approach. It lets you consume stdout incrementally, handle stderr in parallel, and enforce timeouts or cancellation cleanly. The main design goal is to avoid deadlocks by continuously draining both streams while the process runs.

Core Sections

Start subprocess with async pipes

Use asyncio subprocess APIs and request pipes for stdout and stderr.

python
1import asyncio
2
3async def run_cmd():
4    proc = await asyncio.create_subprocess_exec(
5        "python", "-u", "worker.py",
6        stdout=asyncio.subprocess.PIPE,
7        stderr=asyncio.subprocess.PIPE,
8    )
9
10    async for line in proc.stdout:
11        print("OUT:", line.decode().rstrip())
12
13    rc = await proc.wait()
14    print("exit", rc)
15
16asyncio.run(run_cmd())

The -u option in Python child processes disables output buffering, improving real-time streaming.

Read stdout and stderr concurrently

If you only read stdout and stderr fills up, the child can block. Drain both streams concurrently.

python
1async def stream(reader, label):
2    async for raw in reader:
3        print(f"{label}: {raw.decode().rstrip()}")
4
5async def run_both():
6    proc = await asyncio.create_subprocess_exec(
7        "bash", "-lc", "echo hi; echo err 1>&2; sleep 1; echo done",
8        stdout=asyncio.subprocess.PIPE,
9        stderr=asyncio.subprocess.PIPE,
10    )
11
12    await asyncio.gather(
13        stream(proc.stdout, "STDOUT"),
14        stream(proc.stderr, "STDERR"),
15    )
16    await proc.wait()

This pattern avoids pipe backpressure deadlocks.

Add timeout and cancellation control

Long-running commands should have bounded execution.

python
1async def run_with_timeout():
2    proc = await asyncio.create_subprocess_exec(
3        "my_command",
4        stdout=asyncio.subprocess.PIPE,
5        stderr=asyncio.subprocess.PIPE,
6    )
7    try:
8        await asyncio.wait_for(proc.wait(), timeout=30)
9    except asyncio.TimeoutError:
10        proc.kill()
11        await proc.wait()
12        raise

Always await proc.wait() after kill or terminate to avoid zombie processes.

Integrate with structured logging

Instead of printing lines directly, route them to your logging system with command name, pid, and correlation IDs. This helps trace failures when many subprocesses run concurrently.

Common Pitfalls

  • Using blocking subprocess APIs in async services and freezing the event loop.
  • Reading only stdout and ignoring stderr, which can fill buffers and deadlock the child process.
  • Forgetting unbuffered mode for child scripts, causing delayed or bursty output delivery.
  • Killing timed-out processes without awaiting final cleanup, leaving zombie processes.
  • Decoding output without handling encoding mismatches or binary output cases.

Verification Workflow

After implementing the main approach, run a short verification loop that proves behavior on realistic and adversarial inputs. Start with a small happy-path sample that should always pass, then add one edge case and one failure case that should be rejected or handled gracefully. Capture concrete outputs instead of relying on visual inspection alone. For operational code, record one measurable signal such as runtime, memory use, or error count so you can compare before and after future refactors.

Use this quick template during local development and CI:

text
11. Prepare deterministic sample input
22. Run expected-success scenario
33. Run expected-edge scenario
44. Run expected-failure scenario
55. Assert output schema and key values
66. Record one performance or reliability metric

This discipline catches most regressions caused by dependency upgrades, environment differences, or hidden assumptions in helper functions. It also makes handoffs easier because another engineer can reproduce behavior quickly without reverse-engineering your intent from source code alone.

Deployment Notes

Before rolling this pattern into production, add one small automated regression check tied to your most critical user path. Keep the check deterministic and fast, and run it on every dependency or configuration change. This extra guardrail catches subtle behavior drift that static review often misses, especially when environments differ between local machines and CI runners.

Summary

To collect subprocess stdout asynchronously in Python, use asyncio.create_subprocess_exec with piped streams and consume stdout and stderr concurrently. Add timeout handling, explicit process cleanup, and structured logging for production usage. This approach scales from local automation scripts to long-running async services while keeping output streaming reliable and non-blocking.


Course illustration
Course illustration

All Rights Reserved.