Kafka 0.9
KafkaConsumer
Re-consuming Messages
Manual Commit
Offset Management

Kafka 0.9 How to re-consume message when manually committing offset with a KafkaConsumer

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Apache Kafka 0.9 introduced significant enhancements to the KafkaConsumer API, including more granular control over offsets and the ability to manually commit them. This feature is particularly useful for use cases requiring exact processing semantics or when needing to re-consume messages due to processing failures or other reasons. Here’s how you can manage and re-consume messages by manually controlling the commits.

Understanding Manual Offset Committing

In Kafka, offsets are markers that track the progress of a consumer in reading messages from a topic's partitions. Unlike automatic offset committing, manual committing gives you explicit control over when to consider a message as processed and hence, commit its offset. This is crucial when you need a message to be re-read by a consumer in certain scenarios like rollback due to a failure in processing.

Configuration for Manual Committing

To use manual offset committing, you must configure your Kafka consumer accordingly. Below is an example of how to set up a consumer for manual offset commits in Java:

java
1import org.apache.kafka.clients.consumer.KafkaConsumer;
2import org.apache.kafka.clients.consumer.ConsumerRecord;
3import org.apache.kafka.clients.consumer.ConsumerRecords;
4import java.util.Arrays;
5import java.util.Properties;
6
7Properties props = new Properties();
8props.put("bootstrap.servers", "localhost:9092");
9props.put("group.id", "test");
10props.put("enable.auto.commit", "false");  // Disable auto-commit
11props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
12props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
13
14try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
15    consumer.subscribe(Arrays.asList("test-topic"));
16
17    while (true) {
18        ConsumerRecords<String, String> records = consumer.poll(100);
19        for (ConsumerRecord<String, String> record : records) {
20            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
21            // Process each record
22
23            // Manual committing
24            consumer.commitSync();
25        }
26    }
27}

Re-consuming Messages

Re-consuming messages manually committed in Kafka can be handled in two ways:

  1. Committing after processing: Ensures that the message is not seen as 'processed' until it has been fully dealt with. If a failure occurs, the last uncommitted message will be re-consumed after the consumer restarts.
  2. Committing before processing: This can be risky unless combined with some external storage of processing status because if the processing fails after committing, the message will not be re-consumed.

For critical scenarios where message processing must be exactly accounted for, committing after processing is safer. See an example below using the commitSync method after processing:

java
1ConsumerRecords<String, String> records = consumer.poll(100);
2for (ConsumerRecord<String, String> record : records) {
3    try {
4        // Process record
5        consumer.commitSync();  // Commit the offset after processing is successful
6    } catch (Exception e) {
7        // Log failure and/or handle it
8    }
9}

Table: Key Points in Manual Offset Committing and Re-Consuming Messages

FeatureDescriptionUse CaseNotes
Manual Offset CommitCommit offsets manuallyPrecise control over message consumptionUse commitSync() for synchrony and commitAsync() for asynchrony
Re-ConsumingRestart consuming from last committed offsetProcessing failure recoveryEnsure idempotence in message processing

Additional Considerations

  • Error Handling: Proper error handling around manual commits is essential. If not handled correctly, you could either miss messages or process them multiple times.
  • Performance Impact: Frequent commits can impact consumer throughput. It’s often a good practice to commit after a batch of messages has been processed, rather than on every message.
  • Offset Storage: The offsets are stored in a Kafka topic named __consumer_offsets. Ensure this topic is highly available and protected to prevent data loss.

Implementing manual offset control provides a robust way to manage consumer behavior, ensuring that messages are processed exactly as needed by the application logic. Such precise control, while slightly more complex, enhances your system's reliability and fault tolerance in consuming Kafka messages.


Course illustration
Course illustration

All Rights Reserved.