Kafka
High-Level Consumer
Offset Commit
Message Handling
Kafka Tutorials

Kafka - How to commit offset after every message using High-Level consumer?

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Apache Kafka is a powerful, open-source stream processing platform capable of handling high volumes of data and enables the building of real-time streaming data pipelines and applications. Kafka provides various consumer APIs to read data from Kafka, with the high-level consumer API being popular for its ease of use. It handles complex details such as keeping track of read offsets (the position of a consumer in a partition), rebalancing partitions among consumers in a group, and recovering from consumer failures.

Understanding Offset Committing in Kafka

In Kafka, an offset is a pointer to the last record that Kafka has already sent to a consumer in a specific partition of a topic. Managing where a consumer is in a log (i.e., which messages have been consumed) is crucial for many applications, especially those that cannot afford to miss messages or process the same message more than once.

By default, Kafka commits the offsets at periodic intervals as specified in the consumer configuration or based on the number of records consumed. This behavior can be controlled by setting the enable.auto.commit property to true or false. The default setting (true) may not be suitable for applications requiring precise control over when a message is considered consumed, as there might be a chance of message loss or reprocessing on consumer restart.

Manual Offset Control with High-Level Consumers

To commit an offset after every received message, you must disable auto-commit and manually manage the consumer offsets. Here are detailed steps and configurations needed:

  1. Disable Auto-commit: Set enable.auto.commit to false in your consumer configuration. This prevents the consumer from committing offsets automatically.
  2. Configuring the Consumer: Set up a Kafka consumer using the high-level APIs. Here is a simple setup in Java:
java
1   Properties props = new Properties();
2   props.put("bootstrap.servers", "localhost:9092");
3   props.put("group.id", "test-group");
4   props.put("enable.auto.commit", "false");
5   props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
6   props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
7   KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  1. Subscribe to Topics: Specify the topics of interest.
java
   consumer.subscribe(Arrays.asList("your-topic"));
  1. Consuming Messages and Committing Offsets: Use a loop to consume messages and commit offsets immediately after each message is processed.
java
1   while (true) {
2       ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
3       for (ConsumerRecord<String, String> record : records) {
4           processRecord(record);  // your processing function
5           commitOffset(record);
6       }
7   }
8
9   private void processRecord(ConsumerRecord<String, String> record) {
10       // Process each record
11   }
12
13   private void commitOffset(ConsumerRecord<String, String> record) {
14       Map<TopicPartition, OffsetAndMetadata> commitInfo = new HashMap<>();
15       TopicPartition partition = new TopicPartition(record.topic(), record.partition());
16       OffsetAndMetadata metadata = new OffsetAndMetadata(record.offset() + 1);
17       commitInfo.put(partition, metadata);
18       consumer.commitSync(commitInfo);
19   }

Key Points Summary

AspectDetail
Offset CommittingManual, after every message processed
Config Propertyenable.auto.commit set to false
Method of CommitcommitSync(Map<TopicPartition, OffsetAndMetadata>) after processing each message
Processing FunctionCustom function to process records as they are consumed
Rebalancing ConcernsEnsure that the consumer is properly responding to rebalance events

Additional Considerations

  • Error Handling: You should incorporate error handling especially around the message processing and offset committing to safeguard against data loss.
  • Performance: Committing offsets synchronously for each message can impact the performance and throughput of the consumer. Consider the trade-offs between consistency requirements and performance.
  • Rebalance Listeners: Managing offsets manually means you should also manage consumer rebalances manually by implementing a ConsumerRebalanceListener.

In conclusion, committing offsets after every message in Kafka provides a high degree of control and reliability for applications that need strict processing guarantees. While this method introduces additional complexity and potential performance costs, it is crucial for systems where message processing must be precise and accurately recorded.


Course illustration
Course illustration

All Rights Reserved.