Kafka
Programming
Thread Management
Software Development
Coding Techniques

How to write Kafka consumers - single threaded vs multi threaded

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Apache Kafka is a popular distributed streaming platform that allows systems to publish, subscribe to, and process streams of records in real time. Writing effective Kafka consumers is crucial for efficient data processing. Kafka consumers can be implemented in various ways, but one key decision point is whether to use single-threaded or multi-threaded architectures. In this article, we will explore both approaches, along with practical examples and technical details to help you make an informed decision.

Single-Threaded Kafka Consumers

A single-threaded Kafka consumer uses one thread to poll and process records from the Kafka broker. This approach simplifies the design as it does not involve any concurrency or synchronization overhead.

Key Benefits:

  • Simplicity: Easier to implement and debug because it avoids complexities associated with multithreading.
  • Ordering: Maintains the order of records as they are processed sequentially.

Implementation Example:

Here is a basic example using Java, assuming the Kafka client library is included:

java
1import org.apache.kafka.clients.consumer.ConsumerRecord;
2import org.apache.kafka.clients.consumer.ConsumerRecords;
3import org.apache.kafka.clients.consumer.KafkaConsumer;
4
5import java.util.Collections;
6import java.util.Properties;
7
8public class SimpleConsumer {
9    public static void main(String[] args) {
10        Properties props = new Properties();
11        props.put("bootstrap.servers", "localhost:9092");
12        props.put("group.id", "test");
13        props.put("enable.auto.commit", "true");
14        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
15        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
16
17        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
18            consumer.subscribe(Collections.singletonList("my-topic"));
19
20            while (true) {
21                ConsumerRecords<String, String> records = consumer.poll(100);
22                for (ConsumerRecord<String, String> record : records) {
23                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
24                }
25            }
26        }
27    }
28}

In the code above, the consumer polls messages continuously from "my-topic" and processes each record sequentially.

Multi-Threaded Kafka Consumers

In a multi-threaded model, multiple threads are used to poll and process records from Kafka. This approach can help to scale the consumer to process higher volumes of records efficiently.

Key Benefits:

  • Scalability: Can handle larger volumes of data by distributing the load across multiple threads.
  • Performance: Potentially faster processing through parallel execution.

Implementation Strategies:

  1. One Consumer Per Thread: Use a separate KafkaConsumer instance for each thread. Each consumer manages its own TCP connection to the brokers and maintains its own buffer.
  2. Decouple Consumption and Processing: Use a single consumer to poll the messages and then dispatch records to multiple processing threads. This avoids the overhead of having multiple consumers but requires careful handling of thread safety and record ordering.

Example of Decoupling Consumption and Processing:

Here is a simplified example in Java:

java
1import org.apache.kafka.clients.consumer.ConsumerRecord;
2import org.apache.kafka.clients.consumer.ConsumerRecords;
3import org.apache.kafka.clients.consumer.KafkaConsumer;
4
5import java.util.Collections;
6import java.util.Properties;
7import java.util.concurrent.ExecutorService;
8import java.util.concurrent.Executors;
9
10public class MultiThreadedConsumer {
11    private final KafkaConsumer<String, String> consumer;
12    private final ExecutorService executor;
13
14    public MultiThreadedConsumer(int numThreads) {
15        Properties props = new Properties();
16        props.put("bootstrap.servers", "localhost:9092");
17        props.put("group.id", "test-multi");
18        props.put("enable.auto.commit", "true");
19        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
20        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
21
22        consumer = new KafkaConsumer<>(props);
23        executor = Executors.newFixedThreadPool(numThreads);
24    }
25
26    public void start(String topic) {
27        consumer.subscribe(Collections.singletonList(topic));
28        while (true) {
29            ConsumerRecords<String, String> records = consumer.poll(100);
30            records.forEach(record ->
31                executor.submit(() -> processRecord(record))
32            );
33        }
34    }
35
36    private void processRecord(ConsumerRecord<String, String> record) {
37        System.out.printf("Thread=%s, Offset=%d, Key=%s, Value=%s%n",
38                          Thread.currentThread().getName(), record.offset(), record.key(), record.value());
39    }
40
41    public static void main(String[] args) {
42        MultiThreadedConsumer consumer = new MultiThreadedConsumer(4);
43        consumer.start("my-topic");
44    }
45}

In the above code, a single consumer instance fetches records which are then processed by a pool of worker threads.

Comparison Table

FeatureSingle-Threaded ConsumerMulti-Threaded Consumer
ComplexityLowHigh
ScalabilityLimitedHigh
PerformanceModerateHigh (depends on implementation)
Order PreservationYesNo (depends on implementation)
SuitabilityLow-volume environmentsHigh-volume environments or intensive processing tasks

Additional Considerations

  • Offset Management: Ensuring correct offset commits in multi-threaded environments is crucial to avoid data loss or duplication.
  • Error Handling: Concurrency adds complexity to error management, necessitating robust mechanisms to ensure system stability and data integrity.

In summary, choosing between a single-threaded and multi-threaded Kafka consumer architecture depends on the specific requirements and constraints of your application, such as volume of data, processing complexity, and required throughput. Proper implementation and careful attention to concurrency issues are key to leveraging the full power of Kafka consumers.


Course illustration
Course illustration

All Rights Reserved.