KStream
Offset Value
Kafka Streams
Data Processing
Stream Processing

How can I get the offset value in KStream

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Introduction

Kafka Streams makes day to day stream processing pleasant because the DSL focuses on keys and values, not broker metadata. That convenience also means offsets are not passed into every map, filter, or foreach callback. If you need the offset for logging, auditing, or debugging, you have to step down to a context-aware API.

Why the DSL Does Not Hand You the Offset

A KStream record comes from a Kafka topic partition, and every record in that partition has a monotonically increasing offset. The plain DSL intentionally hides most of that metadata. In a simple operation like mapValues, Kafka Streams wants you to think in terms of transformed data, not transport details.

That is why code like this is not possible in a regular DSL lambda:

java
1stream.mapValues(value -> {
2    // No offset argument is available here.
3    return value.toUpperCase();
4});

If you need topic, partition, timestamp, headers, or offset, use a transformer or processor that receives a ProcessorContext.

Reading the Offset with a Transformer

For many applications, the cleanest option is a ValueTransformerWithKey. It still plugs into the DSL, but Kafka Streams injects a ProcessorContext during initialization. That context exposes offset().

java
1import java.util.Properties;
2import org.apache.kafka.common.serialization.Serdes;
3import org.apache.kafka.streams.KafkaStreams;
4import org.apache.kafka.streams.StreamsBuilder;
5import org.apache.kafka.streams.StreamsConfig;
6import org.apache.kafka.streams.kstream.KStream;
7import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
8import org.apache.kafka.streams.processor.ProcessorContext;
9
10public class OffsetExample {
11
12    static class OffsetLoggingTransformer
13            implements ValueTransformerWithKey<String, String, String> {
14
15        private ProcessorContext context;
16
17        @Override
18        public void init(ProcessorContext context) {
19            this.context = context;
20        }
21
22        @Override
23        public String transform(String key, String value) {
24            long offset = context.offset();
25            System.out.printf(
26                "topic=%s partition=%d offset=%d key=%s value=%s%n",
27                context.topic(),
28                context.partition(),
29                offset,
30                key,
31                value
32            );
33            return value;
34        }
35
36        @Override
37        public void close() {
38        }
39    }
40
41    public static void main(String[] args) {
42        Properties props = new Properties();
43        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "offset-demo");
44        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
45        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
46        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
47
48        StreamsBuilder builder = new StreamsBuilder();
49        KStream<String, String> input = builder.stream("orders");
50
51        input.transformValues(OffsetLoggingTransformer::new)
52             .to("orders-output");
53
54        KafkaStreams streams = new KafkaStreams(builder.build(), props);
55        streams.start();
56
57        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
58    }
59}

This pattern is useful when you want to keep using the DSL but need record metadata for a specific step. The offset you read is the offset of the current input record, not the offset of an output record written later in the topology.

Using a Processor for Lower-Level Control

If your logic is more operational than transformational, a processor can be a better fit. A processor is explicit about handling each input record and is a natural place for logging or custom side effects.

java
1import org.apache.kafka.streams.processor.AbstractProcessor;
2import org.apache.kafka.streams.processor.ProcessorContext;
3
4public class OffsetProcessor extends AbstractProcessor<String, String> {
5    private ProcessorContext context;
6
7    @Override
8    public void init(ProcessorContext context) {
9        super.init(context);
10        this.context = context;
11    }
12
13    @Override
14    public void process(String key, String value) {
15        System.out.printf("offset=%d key=%s%n", context.offset(), key);
16        context().forward(key, value);
17    }
18}

You would attach that processor to a KStream with process. This is heavier than a normal DSL step, but it gives you direct access to metadata and forwarding behavior.

When the Offset May Be Missing

Offset access is tied to an input record. If code runs outside normal record processing, the metadata may not exist. In those cases Kafka Streams can return -1 for the offset.

That matters in two common cases:

  1. Punctuation callbacks are time-driven, not tied to a specific source record.
  2. Some optimized paths, especially in parts of the KTable API, do not always have stable record metadata.

So treat offsets as contextual metadata, not as a permanent business identifier.

Common Pitfalls

One common mistake is expecting offsets inside every DSL lambda. Most high-level callbacks do not expose them, so reaching for mapValues or peek alone will not solve the problem.

Another mistake is storing offsets as if they were globally unique. Offsets are only unique within a single topic partition. If you persist them, persist the topic and partition alongside them.

A third mistake is assuming context.offset() is always valid. During punctuation or some internal optimizations, Kafka Streams may not have a current source record. Guard for -1 and decide how your application should behave in that case.

Finally, avoid using the consumed offset as proof that downstream work succeeded. Processing, forwarding, and committing are related but not identical steps. If you need end-to-end guarantees, rely on Kafka Streams processing semantics and your sink behavior, not the offset alone.

Summary

  • A plain KStream DSL callback does not usually expose the Kafka offset.
  • Use ValueTransformerWithKey, Transformer, or the Processor API when you need ProcessorContext.
  • Read the current record offset with context.offset().
  • Expect -1 when code is not running against a real input record.
  • Treat offsets as partition-scoped metadata, not as business IDs.

Course illustration
Course illustration

All Rights Reserved.