PySpark
Foreach/ForeachBatch
Database Writing
Programming Tutorials
Big Data

How to use foreach or foreachBatch in PySpark to write to database?

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Introduction

When writing streaming data from PySpark into a database, foreachBatch is usually the right tool for Structured Streaming. It lets you work with each micro-batch as a normal DataFrame, which fits database writes much better than row-by-row foreach logic.

foreach and foreachBatch Solve Different Problems

foreach operates on individual rows. That means you are responsible for per-row connection handling, error handling, retries, and performance.

foreachBatch operates on an entire micro-batch DataFrame. That gives you the chance to use bulk writes, JDBC output, merge logic, or transactional database patterns.

A practical rule is:

  • use foreach only when you genuinely need row-level side effects
  • use foreachBatch for database writes in most Structured Streaming jobs

foreachBatch Example

Here is the usual pattern for a database sink.

python
1from pyspark.sql import SparkSession
2
3spark = SparkSession.builder.appName("stream-to-db").getOrCreate()
4
5stream_df = (
6    spark.readStream
7    .format("rate")
8    .option("rowsPerSecond", 5)
9    .load()
10)
11
12
13def write_batch(batch_df, batch_id):
14    (batch_df.write
15        .format("jdbc")
16        .option("url", "jdbc:postgresql://localhost:5432/demo")
17        .option("dbtable", "stream_events")
18        .option("user", "postgres")
19        .option("password", "secret")
20        .option("driver", "org.postgresql.Driver")
21        .mode("append")
22        .save())
23
24
25query = (
26    stream_df.writeStream
27    .foreachBatch(write_batch)
28    .outputMode("append")
29    .start()
30)
31
32query.awaitTermination()

This is much easier to reason about than opening a database connection for each row.

Why foreachBatch Is Usually Better for Databases

Databases are optimized for batched operations, not thousands of tiny independent writes. foreachBatch gives you:

  • one micro-batch at a time
  • DataFrame operations before the write
  • easier use of JDBC writers
  • simpler idempotency and merge patterns

That is why it is the standard choice for database sinks in Spark Structured Streaming.

Use foreach Only for True Per-Row Side Effects

If you still need per-row behavior, foreach is possible, but you must own the operational complexity.

python
1class RowWriter:
2    def open(self, partition_id, epoch_id):
3        return True
4
5    def process(self, row):
6        print(row)
7
8    def close(self, error):
9        pass

That pattern is useful for special sinks or custom side effects, but it is rarely the best design for relational database writes.

Idempotency Matters

Streaming queries can restart, retry, or replay data depending on failure timing. A database sink should therefore be idempotent or at least resilient to duplicate delivery.

Typical strategies include:

  • writing to a staging table and merging
  • using upserts keyed by a business identifier
  • recording processed batch IDs in the sink

Without this, a restarted query can insert duplicate rows even if the streaming logic itself is correct.

Transform Before Writing

Because foreachBatch gives you a normal DataFrame, it is the right place to clean or aggregate data before sending it to the database.

python
1def write_batch(batch_df, batch_id):
2    cleaned = batch_df.selectExpr("value", "timestamp")
3    (cleaned.write
4        .format("jdbc")
5        .option("url", "jdbc:postgresql://localhost:5432/demo")
6        .option("dbtable", "stream_events")
7        .option("user", "postgres")
8        .option("password", "secret")
9        .option("driver", "org.postgresql.Driver")
10        .mode("append")
11        .save())

This is another reason foreachBatch is stronger than row-level code for database workflows.

Database Connections and Parallelism

Even with foreachBatch, writing to a database is not infinitely scalable. The database may become the bottleneck before Spark does.

Be deliberate about:

  • the number of output partitions
  • JDBC batch size settings
  • transaction size
  • database indexes and constraints

Spark can produce data quickly, but the sink still needs to absorb it safely.

Common Pitfalls

A common mistake is using row-level foreach for a relational database when a batch write would be simpler and faster.

Another mistake is ignoring idempotency. Streaming restarts and retries are normal, so the sink must tolerate them.

Developers also often treat foreachBatch like ordinary batch ETL without remembering that the query may run indefinitely and emit many separate micro-batches.

Finally, do not assume that because Spark is distributed, the target database will scale automatically with the same write pattern.

Summary

  • 'foreachBatch is usually the best choice for writing Structured Streaming data to a database.'
  • It lets you use normal DataFrame transformations and batch-oriented JDBC writes.
  • Use row-level foreach only when the sink truly requires per-record side effects.
  • Make database writes idempotent so retries and restarts do not corrupt results.
  • Treat the database as a constrained sink and design partitioning and write volume accordingly.

Course illustration
Course illustration

All Rights Reserved.