Kafka
Custom Headers
Message Brokers
Data Streaming
Application Development

Adding Custom Headers in Kafka Message

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Apache Kafka, an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation, is primarily designed for handling real-time data feeds. Kafka is a robust system that effectively supports custom headers in messages, which can be very beneficial in several scenarios such as message filtering, routing, or tracking context-specific metadata. This feature was introduced in Kafka version 0.11, enhancing the flexibility and usability of the Kafka message.

What Are Kafka Headers?

Kafka headers consist of key-value pairs that are associated with the message (also known as a record in Kafka terminology). Headers provide a mechanism to include metadata or additional information alongside the payload of the message without modifying the payload itself.

Adding Custom Headers

To add custom headers to a Kafka message in a producer application, the Kafka client API provides the capability. Below is a Java example demonstrating how to use the Producer API to send messages with headers:

java
1import org.apache.kafka.clients.producer.KafkaProducer;
2import org.apache.kafka.clients.producer.ProducerRecord;
3import org.apache.kafka.clients.producer.ProducerConfig;
4import org.apache.kafka.common.serialization.StringSerializer;
5import java.util.Properties;
6
7public class KafkaProducerWithHeaders {
8    public static void main(String[] args){
9        Properties props = new Properties();
10        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
11        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
12        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
13
14        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
15        
16        // Create a record with headers
17        ProducerRecord<String, String> record = new ProducerRecord<>("your-topic", "Hello, World!");
18        record.headers().add("headerKey", "headerValue".getBytes());
19
20        // Send the record
21        producer.send(record);
22        producer.close();
23    }
24}

Use Cases for Headers in Kafka

  1. Routing: Kafka headers can be used to include routing information, allowing consumers to route or filter messages based on header values without needing to deserialize the entire message.
  2. Metadata: Extra information such as message encoding, content type, or even custom flags can be stored in headers.
  3. Tracking: Headers are ideal for passing trace or monitoring identifiers that help in tracking messages as they flow through different components or microservices.

How Consumers Use Headers

On the consumer side, accessing headers is straightforward. Following is a Java example of how to consume messages and access their headers:

java
1import org.apache.kafka.clients.consumer.KafkaConsumer;
2import org.apache.kafka.clients.consumer.ConsumerRecord;
3import org.apache.kafka.clients.consumer.ConsumerConfig;
4import org.apache.kafka.common.serialization.StringDeserializer;
5import java.util.Collections;
6import java.util.Properties;
7
8public class KafkaConsumerWithHeaders {
9    public static void main(String[] args){
10        Properties props = new Properties();
11        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
12        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
13        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
14        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
15
16        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
17        consumer.subscribe(Collections.singletonList("your-topic"));
18
19        while (true) {
20            for (ConsumerRecord<String, String> record : consumer.poll(100)) {
21                record.headers().forEach(header -> {
22                    System.out.println("Key: " + header.key() + " Value: " + new String(header.value()));
23                });
24                System.out.println("Received message: " + record.value());
25            }
26        }
27    }
28}

Summary Table of Kafka Headers

PropertyDescriptionExample Usage
Header Key-Value PairsAllows storing metadata in key-value formatrecord.headers().add("key", "value")
ImmutabilityOnce set, the headers are immutable for safe transmissionN/A
SerializationUtilizes byte arrays to allow flexibility in data type"exampleValue".getBytes()
ConsumptionEasily accessible in consumer applicationsrecord.headers()

Conclusion

Kafka's support for custom headers provides a powerful mechanism to enrich messages with metadata that can drive more intelligent processing downstream. Whether used for routing, metadata encapsulation, or tracking, headers enhance the versatility and functionality of Kafka messaging solutions.


Course illustration
Course illustration

All Rights Reserved.