Kafka
Consumer Offset
Data Processing
Coding Tutorials
Kafka Consumers

How to make kafka consumer to read from last consumed offset but not from beginning

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Apache Kafka is a distributed streaming platform capable of handling trillions of events a day. A fundamental piece of Kafka's capability is its durable consumer offset tracking feature. This feature allows Kafka consumers to continue consuming data from where they left off, ensuring no data loss or duplication after a rebalance or restart of consumer clients. This behavior is especially crucial for applications that require high reliability and consistency. Below, we will explore how you can configure your Kafka consumer to read from the last consumed offset rather than from the beginning.

Understanding Consumer Groups and Offsets

In Kafka, consumers are typically organized into consumer groups. Each consumer within a group reads from exclusive partitions of a topic, and the group collectively covers the entire topic. Each message in a partition is assigned a unique sequential ID called an offset. Kafka stores the offset of the last message read by a consumer group in a special internal Kafka topic named __consumer_offsets. When a consumer in a group restarts, it can resume reading from where it left off using this stored offset.

Configuration Requirements

To ensure that your Kafka consumer reads from the last consumed offset, you need to correctly configure the following properties in your consumer:

  1. Group ID: Each consumer has to specify a group.id that identifies the consumer group it belongs. This ID is used by Kafka to track the offset consumption for each consumer group.
  2. Auto Offset Reset: The auto.offset.reset property should be set wisely.
    • earliest: This tells the consumer to start from the oldest message in the topic if no previous offset is found for this group.
    • latest: This will start reading from new messages that get added after the consumer joins the group.
    • none: This setting will throw an exception if no previous offset is tracked for this consumer group. For reading from the last consumed offset, generally latest or none is preferred dependent on your application behavior in situations where no offsets are saved.
  3. Enable Auto Commit: Setting enable.auto.commit to true allows Kafka to automatically commit offsets at a frequency determined by auto.commit.interval.ms. This setting relieves developers from manually managing offset commits, which can reduce complexity and potential errors.

Practical Consumer Configuration Example

Here’s an example configuration for a Kafka consumer in Java:

java
1import org.apache.kafka.clients.consumer.KafkaConsumer;
2import org.apache.kafka.clients.consumer.ConsumerRecord;
3import org.apache.kafka.clients.consumer.ConsumerRecords;
4import java.util.Arrays;
5import java.util.Properties;
6
7public class ConsumerDemo {
8    public static void main(String[] args) {
9        Properties props = new Properties();
10        props.put("bootstrap.servers", "localhost:9092");
11        props.put("group.id", "group1");
12        props.put("enable.auto.commit", "true");
13        props.put("auto.commit.interval.ms", "1000");
14        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
15        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
16        props.put("auto.offset.reset", "latest");
17
18        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
19            consumer.subscribe(Arrays.asList("my-topic"));
20            
21            while (true) {
22                ConsumerRecords<String, String> records = consumer.poll(100);
23                for (ConsumerRecord<String, String> record : records) {
24                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
25                }
26            }
27        }
28    }
29}

Offset Management Practices

  • Manual Offset Control: Advanced users might choose to manually manage offsets using commitSync and commitAsync methods provided by KafkaConsumer. However, this increases the complexity and potential errors.
  • Handling Rebalances: Implement a ConsumerRebalanceListener to handle the event when a consumer might lose access to partitions. This ensures that consumer can commit its offset before losing the partition.

Summary Table

PropertyRecommended ValueDescription
group.idMust be unique per groupIdentifies the consumer‘s group.
enable.auto.committrueIf true, offsets are committed automatically.
auto.offset.resetlatest or noneControls where consumption starts if no offset is saved.

Conclusion

Handling offsets in Kafka is foundational for reliable message processing. Proper configuration and understanding of consumer groups, offset management, and the implications of various settings are critical for developing robust Kafka-based applications. Through the careful use of the settings and practices discussed, developers can enhance durability and fault tolerance in their Kafka consumer implementations.


Course illustration
Course illustration

All Rights Reserved.