Kafka Consumer
C++ Programming
Data Streaming
Application Development
Software Architecture

Kafka Consumer in C++

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Apache Kafka is a popular distributed streaming platform that facilitates the sending, storing, and processing of streams of records. Kafka is commonly used in scenarios requiring high throughput and reliable latency in data transmission. For developers utilizing C++, integrating with Kafka typically involves using librdkafka, a C/C++ library designed to act as a portable Kafka client. This article will outline how to use Kafka consumer API in C++ with librdkafka, explore its various configurations and options, and provide clear, practical examples.

1. Introduction to Kafka Consumer API

The Kafka Consumer API allows applications to read streams of data from topics within a Kafka cluster. When using C++, you can access this functionality through librdkafka, which offers both high and low-level APIs for consuming messages.

2. Setting Up librdkafka

To use librdkafka with your C++ application, you'll first need to install the library. You can usually find librdkafka on package management systems like vcpkg or brew, or you can build it from source.

bash
1# For macOS
2brew install librdkafka
3
4# For Debian/Ubuntu
5sudo apt-get install librdkafka-dev

3. Basic Kafka Consumer with C++

Here's a simple example on how to create a Kafka consumer using C++ and librdkafka:

cpp
1#include <librdkafka/rdkafkacpp.h>
2
3int main() {
4    std::string brokers = "localhost:9092";
5    std::string topicName = "test_topic";
6    
7    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
8    std::string errstr;
9
10    // Set the broker list
11    conf->set("metadata.broker.list", brokers, errstr);
12    
13    // Create consumer using accumulated global configuration
14    RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
15    if (!consumer) {
16        std::cerr << "Failed to create consumer: " << errstr << std::endl;
17        return 1;
18    }
19
20    // Start consuming messages
21    RdKafka::TopicPartitionList *topics = new RdKafka::TopicPartitionList();
22    topics->push_back(RdKafka::TopicPartition::create(topicName, 0));
23    consumer->assign(*topics);
24
25    while (true) {
26        RdKafka::Message *msg = consumer->consume(1000); // Timeout in ms
27        if (msg->err() == RdKafka::ERR_NO_ERROR) {
28            // Process the message
29            std::cout << "Read msg at offset " << msg->offset() << std::endl;
30            std::cout << "Message Payload: " << static_cast<const char *>(msg->payload()) << std::endl;
31        }
32
33        delete msg; // Release resources
34    }
35
36    // Clean up
37    consumer->close();
38    delete consumer;
39    delete conf;
40    delete topics;
41
42    return 0;
43}

4. Working with Consumer Groups and Offset Management

In Kafka, consumer groups are used for scaling consumption by dividing the load of topics across multiple consumers. Each consumer within a group reads from exclusive partitions of the topic, and if a consumer fails, Kafka reassigns the partition to another consumer in the group.

Here is how to set the consumer group and manage offsets:

cpp
conf->set("group.id", "my_consumer_group", errstr);
conf->set("auto.offset.reset", "earliest", errstr);
  • group.id specifies the consumer group id.
  • auto.offset.reset specifies what to do when there is no initial offset or if the current offset is invalid: earliest will reset to the earliest available offset, and latest skips to the newest messages.

5. Key Configuration Options

Configuration KeyDefault ValueDescription
bootstrap.serversnoneA list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
group.idnoneA unique string that identifies the consumer group.
enable.auto.committrueIf true, the consumer's offset is periodically committed in the background.
auto.offset.resetlatestControls how to reset offsets on missing offsets or when there is no initial offset.

6. Handling Errors and Rebalances

Error handling and rebalance logic are critical in real-world applications. Consumers may need to handle scenarios like network errors, partition rebalances, or unplanned broker shutdowns. In librdkafka, you can implement callbacks to manage these events:

cpp
1class ExampleRebalanceCb : public RdKafka::RebalanceCb {
2public:
3    void rebalance_cb(RdKafka::KafkaConsumer *consumer,
4                      RdKafka::ErrorCode err,
5                      RdKafka::TopicPartitionList *partitions) override {
6        if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
7            consumer->assign(*partitions);
8        } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
9            consumer->unassign();
10        } else {
11            std::cerr << "Rebalance error: " << RdKafka::err2str(err) << std::endl;
12        }
13    }
14};
15
16int main() {
17    // Before creating the consumer
18    ExampleRebalanceCb ex_rebalance_cb;
19    conf->set("rebalance_cb", &ex_rebalance_cb, errstr);
20}

7. Conclusion

Integrating Kafka with C++ using librdkafka offers powerful capabilities for message consumption. By leveraging the configurations, handling errors effectively, and managing consumer groups and offsets, developers can ensure efficient data processing in distributed systems.

As sample code has demonstrated, implementing a Kafka consumer in C++ requires careful consideration of each component's role and behavior but provides a flexible and robust system for large-scale data management.


Course illustration
Course illustration

All Rights Reserved.