Kafka
Message Consumption
Kafka Headers
Apache Kafka
Programming

How to access Kafka headers while consuming a message?

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Apache Kafka is a distributed streaming platform that allows you to publish, subscribe to, store, and process streams of records in real-time. Kafka is widely used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

When working with Kafka, you might need to access message headers that are key-value pairs included with the message in a Kafka record. These headers are often used to convey additional information like data about the origin of the message, timestamps, or specific flags relevant to the consumer’s logic. Accessing these headers is essential for applications that rely on metadata for processing.

Understanding Kafka Message Headers

Kafka messages consist of key, value, and headers. Each message in Kafka can include zero or more headers. Introduced in Kafka version 0.11.0.0, headers support a rich ecosystem of messages by allowing metadata to be included with the message payload without altering the payload content itself.

How to Access Headers in Kafka Consumers

To access Kafka headers while consuming messages, you primarily interact with the ConsumerRecord class provided by the Kafka client API in various programming languages. Below we explore how to access these headers using popular Kafka clients in Java and Python.

Using Java

Here’s how you can access Kafka headers in a Java application using the Kafka client library:

  1. Set Up Kafka Consumer: Initialize the KafkaConsumer object with appropriate configuration.
  2. Subscribe to a Topic: Use the subscribe method of the consumer object to listen to one or more Kafka topics.
  3. Poll Messages: Continuously poll for new messages.
  4. Access Headers: For each ConsumerRecord, use the headers() method to access the headers.
java
1import org.apache.kafka.clients.consumer.ConsumerRecord;
2import org.apache.kafka.clients.consumer.ConsumerRecords;
3import org.apache.kafka.clients.consumer.KafkaConsumer;
4
5import java.util.Arrays;
6import java.util.Properties;
7
8public class HeaderConsumerExample {
9    public static void main(String[] args) {
10        Properties props = new Properties();
11        props.put("bootstrap.servers", "localhost:9092");
12        props.put("group.id", "test-group");
13        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
14        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
15
16        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
17            consumer.subscribe(Arrays.asList("my-topic"));
18            while (true) {
19                ConsumerRecords<String, String> records = consumer.poll(100);
20                for (ConsumerRecord<String, String> record : records) {
21                    record.headers().forEach(header -> {
22                        System.out.printf("Key: %s, Value: %s%n", header.key(), new String(header.value()));
23                    });
24                }
25            }
26        }
27    }
28}

Using Python

Accessing Kafka headers in Python using the Confluent Kafka library:

  1. Install the Library: Ensure that the confluent_kafka library is installed.
  2. Create a Consumer: Configure and create an instance of Consumer.
  3. Subscribe and Poll: Subscribe to a topic and use a loop to poll messages.
  4. Extract Headers: Access the headers() of each message.
python
1from confluent_kafka import Consumer, KafkaError
2
3conf = {
4    'bootstrap.servers': "localhost:9092",
5    'group.id': "python-consumer",
6    'auto.offset.reset': 'earliest'
7}
8
9consumer = Consumer(conf)
10consumer.subscribe(['my-topic'])
11
12try:
13    while True:
14        msg = consumer.poll(timeout=1.0)
15        if msg is None:
16            continue
17        if msg.error():
18            if msg.error().code() == KafkaError._PARTITION_EOF:
19                continue
20            else:
21                print(msg.error())
22                break
23
24        headers = msg.headers()
25        for key, value in headers:
26            print(f"Key: {key}, Value: {value.decode('utf-8')}")
27finally:
28    consumer.close()

Summary Table

FeatureDetails
Kafka VersionHeaders introduced in version 0.11.0.0
Key Use CasesStoring metadata like origin, timestamps, flags
Access in JavaUse headers() in ConsumerRecord
Access in PythonUse headers() method on message object in confluent_kafka
Common OperationsIterating through headers, retrieving header value using key, decoding values

Conclusion

Accessing headers in Kafka provides enhanced capabilities for message processing by allowing additional metadata to travel with messages seamlessly. Whether you’re implementing complex routing logic, message filtering, or simply logging message sources, headers can be incredibly useful. The examples provided in Java and Python should help you integrate header-based logic in your Kafka consumer applications efficiently.


Course illustration
Course illustration

All Rights Reserved.