Kafka
Consumer Offsets
Apache Kafka
Kafka Topics
Kafka Consumers

Kafka how to read from __consumer_offsets topic

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Introduction

The fastest way to read from Kafka's __consumer_offsets topic is with the built-in kafka-console-consumer using Kafka's internal GroupMetadataManager formatter:

bash
1kafka-console-consumer \
2  --bootstrap-server localhost:9092 \
3  --topic __consumer_offsets \
4  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
5  --from-beginning

This decodes the internal binary format into human-readable output showing consumer group, topic, partition, and committed offset. The rest of this article covers why you would read this topic, what the records contain, how to decode them programmatically, and how to use modern alternatives like kafka-consumer-groups for common operational tasks.

What Is __consumer_offsets?

__consumer_offsets is an internal Kafka topic that stores consumer group offset commits. When a consumer calls commitSync() or commitAsync(), Kafka writes a record to this topic recording where that consumer group has read up to for each partition.

Before Kafka 0.9, offsets were stored in ZooKeeper. The migration to an internal topic improved performance and scalability because Kafka's own log-based storage handles high-throughput writes more efficiently than ZooKeeper.

Key Properties

PropertyValue
Topic name__consumer_offsets
Default partitions50
Cleanup policyCompact (retains latest offset per key)
Replication factorMatches offsets.topic.replication.factor (default 3)
Created automaticallyYes, on first consumer group commit
Internal topicYes (hidden from default topic listings)

The topic has 50 partitions by default. Each consumer group is assigned to a specific partition based on a hash of the group ID. This is the "group coordinator" partition for that group.

Reading with kafka-console-consumer

Basic Command

bash
1kafka-console-consumer \
2  --bootstrap-server localhost:9092 \
3  --topic __consumer_offsets \
4  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
5  --from-beginning

Output looks like:

text
[my-consumer-group,my-topic,0]::OffsetAndMetadata(offset=1542,leaderEpoch=Optional[0],metadata=,commitTimestamp=1719849600000,expireTimestamp=None)
[my-consumer-group,my-topic,1]::OffsetAndMetadata(offset=2103,leaderEpoch=Optional[0],metadata=,commitTimestamp=1719849600000,expireTimestamp=None)

Each line shows: [group, topic, partition] :: offset details.

Reading Group Metadata (Not Just Offsets)

The __consumer_offsets topic stores two types of records: offset commits and group metadata (member assignments, protocol, leader info). To read group metadata:

bash
1kafka-console-consumer \
2  --bootstrap-server localhost:9092 \
3  --topic __consumer_offsets \
4  --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" \
5  --from-beginning

This shows consumer group membership, rebalance information, and assignment details.

Filtering for a Specific Consumer Group

The console consumer does not support filtering by key, but you can pipe the output through grep:

bash
1kafka-console-consumer \
2  --bootstrap-server localhost:9092 \
3  --topic __consumer_offsets \
4  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
5  --from-beginning 2>/dev/null \
6| grep "my-consumer-group"

For more precise filtering, read the topic programmatically (covered below).

Reading Programmatically with Java

To read __consumer_offsets in a Java application, you need to handle the internal binary serialization format:

java
1import org.apache.kafka.clients.consumer.*;
2import org.apache.kafka.common.serialization.ByteArrayDeserializer;
3import java.time.Duration;
4import java.util.Collections;
5import java.util.Properties;
6
7public class OffsetReader {
8    public static void main(String[] args) {
9        Properties props = new Properties();
10        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
11        props.put(ConsumerConfig.GROUP_ID_CONFIG, "offset-reader-tool");
12        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
13        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
14        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
15            ByteArrayDeserializer.class.getName());
16        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
17            ByteArrayDeserializer.class.getName());
18        // Required: allow reading internal topics
19        props.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false");
20
21        try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
22            consumer.subscribe(Collections.singletonList("__consumer_offsets"));
23
24            while (true) {
25                ConsumerRecords<byte[], byte[]> records =
26                    consumer.poll(Duration.ofMillis(1000));
27                for (ConsumerRecord<byte[], byte[]> record : records) {
28                    // Key and value are in Kafka's internal binary format
29                    // Use GroupMetadataManager to decode (see below)
30                    System.out.printf("Partition: %d, Offset: %d, Key size: %d%n",
31                        record.partition(), record.offset(),
32                        record.key() != null ? record.key().length : 0);
33                }
34            }
35        }
36    }
37}

Decoding the Binary Format

The key and value use Kafka's internal serialization schemas. Decoding them requires Kafka's internal classes:

java
1import kafka.coordinator.group.GroupMetadataManager;
2import org.apache.kafka.common.protocol.ByteBufferAccessor;
3import java.nio.ByteBuffer;
4
5// Decode the key to determine record type
6ByteBuffer keyBuffer = ByteBuffer.wrap(record.key());
7short version = keyBuffer.getShort();
8
9if (version == 0 || version == 1) {
10    // Offset commit key: contains group, topic, partition
11    // Use GroupMetadataManager.readMessageKey() for full parsing
12} else if (version == 2) {
13    // Group metadata key: contains group ID
14}

In practice, using GroupMetadataManager.readMessageKey() and GroupMetadataManager.readOffsetMessageValue() from Kafka's server module is the reliable way to decode these records. However, this creates a dependency on kafka-server artifacts, which is heavy.

Python Alternative with kafka-python

python
1from kafka import KafkaConsumer
2import struct
3
4consumer = KafkaConsumer(
5    '__consumer_offsets',
6    bootstrap_servers='localhost:9092',
7    group_id='offset-reader-py',
8    enable_auto_commit=False,
9    auto_offset_reset='earliest',
10    key_deserializer=None,
11    value_deserializer=None,
12    exclude_internal_topics=False,
13)
14
15for message in consumer:
16    if message.key is None:
17        continue
18
19    # First 2 bytes of key are the version
20    version = struct.unpack('>H', message.key[:2])[0]
21
22    if version in (0, 1):
23        # Offset commit record
24        # Parse group_id, topic, partition from remaining bytes
25        key_data = message.key[2:]
26        group_len = struct.unpack('>H', key_data[:2])[0]
27        group_id = key_data[2:2 + group_len].decode('utf-8')
28
29        remaining = key_data[2 + group_len:]
30        topic_len = struct.unpack('>H', remaining[:2])[0]
31        topic = remaining[2:2 + topic_len].decode('utf-8')
32
33        partition = struct.unpack('>I', remaining[2 + topic_len:2 + topic_len + 4])[0]
34
35        print(f"Group: {group_id}, Topic: {topic}, Partition: {partition}")

The Easier Alternative: kafka-consumer-groups

For most operational tasks, you do not need to read __consumer_offsets directly. The kafka-consumer-groups CLI tool provides a structured view:

List All Consumer Groups

bash
kafka-consumer-groups --bootstrap-server localhost:9092 --list

Describe a Consumer Group

bash
kafka-consumer-groups --bootstrap-server localhost:9092 \
  --describe --group my-consumer-group

Output:

text
1GROUP              TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID  HOST         CLIENT-ID
2my-consumer-group  my-topic   0          1542            1600            58    consumer-1   /10.0.0.1    client-1
3my-consumer-group  my-topic   1          2103            2103            0     consumer-2   /10.0.0.2    client-2
4my-consumer-group  my-topic   2          987             1050            63    consumer-1   /10.0.0.1    client-1

This shows current offsets, end offsets, and lag per partition without needing to decode binary records.

Reset Offsets

bash
1# Dry run first
2kafka-consumer-groups --bootstrap-server localhost:9092 \
3  --group my-consumer-group --topic my-topic \
4  --reset-offsets --to-earliest --dry-run
5
6# Execute
7kafka-consumer-groups --bootstrap-server localhost:9092 \
8  --group my-consumer-group --topic my-topic \
9  --reset-offsets --to-earliest --execute

Comparison: Direct Read vs CLI Tool

TaskDirect __consumer_offsetskafka-consumer-groups CLI
View current offsetsRequires binary decoding--describe
View lagMust compute manuallyShown automatically
Historical offset changesYes (read from beginning)No (current state only)
Reset offsetsNot directly--reset-offsets
Custom monitoringYes (programmatic access)Limited
Audit trailFull history in topicCurrent snapshot only

Partition Assignment

Each consumer group's offsets are stored in a specific partition of __consumer_offsets, determined by:

text
partition = Math.abs(groupId.hashCode()) % numPartitions

With the default 50 partitions, you can predict which partition holds a group's data:

java
int partition = Math.abs("my-consumer-group".hashCode()) % 50;
System.out.println("Offsets stored in partition: " + partition);

This is useful for debugging: if a specific __consumer_offsets partition is experiencing high latency, you can identify which consumer groups are affected.

Security and Access Control

Reading __consumer_offsets requires specific ACLs in secured Kafka clusters:

bash
1# Grant read access to the internal topic
2kafka-acls --bootstrap-server localhost:9092 \
3  --add --allow-principal User:monitoring-app \
4  --operation Read --topic __consumer_offsets

In most production environments, direct access to internal topics is restricted to operators and monitoring tools. Application code should use the kafka-consumer-groups API or Kafka AdminClient instead.

Common Pitfalls

Using string deserializers. The __consumer_offsets topic stores keys and values in Kafka's internal binary format. Using StringDeserializer produces garbled output. Always use ByteArrayDeserializer and decode manually, or use the built-in formatters.

Forgetting exclude.internal.topics=false. By default, Kafka consumers skip internal topics. You must explicitly set this config to false to subscribe to __consumer_offsets.

Reading directly in production for monitoring. Reading the entire __consumer_offsets topic from the beginning on a busy cluster generates significant I/O. For monitoring, use kafka-consumer-groups --describe or JMX metrics (kafka.consumer:type=consumer-fetch-manager-metrics,*) instead.

Committing offsets from the reader consumer group. If your offset-reading consumer commits its own offsets, it writes records back to __consumer_offsets, creating noise. Set enable.auto.commit=false and do not call commitSync().

Assuming compaction removes all old records immediately. Log compaction keeps the latest record per key, but compaction runs asynchronously. You may see multiple records for the same group/topic/partition when reading from the beginning. Use the latest record for each key.

Not accounting for tombstones. When a consumer group is deleted or its offsets expire, Kafka writes a tombstone (null value) to __consumer_offsets. Your consumer code must handle null values gracefully.

Summary

  • Use kafka-console-consumer with the OffsetsMessageFormatter for quick inspection of __consumer_offsets.
  • The topic stores two record types: offset commits (group, topic, partition, offset) and group metadata (membership, assignments).
  • For most operational tasks, kafka-consumer-groups --describe is simpler and provides lag calculations automatically.
  • Reading programmatically requires ByteArrayDeserializer and exclude.internal.topics=false.
  • Decoding the binary format requires Kafka's internal classes or manual struct parsing.
  • Consumer group data is partitioned across 50 default partitions using a hash of the group ID.
  • Direct reads are best suited for auditing, custom monitoring, and debugging. For day-to-day operations, prefer the CLI tools or AdminClient API.

Course illustration
Course illustration

All Rights Reserved.