Kafka
Avro
Golang
Data Consumption
Message Processing

Consume Kafka Avro messages in go

Master System Design with Codemia

Enhance your system design skills with over 120 practice problems, detailed solutions, and hands-on exercises.

Apache Kafka is a popular distributed streaming platform used for building real-time data pipelines and streaming applications. It provides high-throughput, built-in partitioning, replication, and fault tolerance. Kafka uses a binary protocol over TCP, which supports a variety of serialization formats, including Apache Avro.

Apache Avro is a serialization framework that integrates well with Kafka. It uses JSON for defining data structures and protocols, and serializes data in a compact binary format. This makes it highly suitable for Kafka messages due to its efficiency both in terms of storage and speed.

Why Use Avro with Kafka?

  1. Schema Evolution: Avro supports the evolution of schemas over time. This allows you to update the schema used to write data, without breaking systems that read the data.
  2. Strong Typing: Avro data is always read with its schema, which facilitates strong typing and schema validation.
  3. Compact Data: The binary format of Avro uses less space and incurs less overhead when transported or stored.

Setting Up a Go Application to Consume Avro Messages from Kafka

Here’s how you can set up a Go application to consume data serialized in Avro format from a Kafka topic.

Prerequisites

  • Kafka broker running
  • Avro schema
  • Go environment setup

Dependencies

First, install the necessary Go packages. We'll use confluent-kafka-go for Kafka interaction and go-avro for Avro serialization.

bash
go get -u github.com/confluentinc/confluent-kafka-go/kafka
go get -u github.com/hamba/avro

Consuming Kafka Avro Messages

Here’s a basic example:

go
1package main
2
3import (
4	"context"
5	"fmt"
6	"github.com/confluentinc/confluent-kafka-go/kafka"
7	"github.com/hamba/avro"
8)
9
10func main() {
11    // Define the Avro schema
12    schema, err := avro.Parse(`{
13        "type": "record",
14        "name": "User",
15        "fields": [
16            {"name": "id", "type": "int"},
17            {"name": "name", "type": "string"}
18        ]
19    }`)
20    if err != nil {
21        panic(err)
22    }
23
24    // Kafka consumer configuration
25    c, err := kafka.NewConsumer(&kafka.ConfigMap{
26        "bootstrap.servers": "localhost:9092",
27        "group.id": "myGroup",
28        "auto.offset.reset": "earliest",
29    })
30
31    if err != nil {
32        panic(err)
33    }
34
35    c.SubscribeTopics([]string{"your_topic_name"}, nil)
36
37    for {
38        msg, err := c.ReadMessage(-1)
39        if err == nil {
40            user := User{}
41            _, err = avro.Unmarshal(schema, msg.Value, &user)
42            if err != nil {
43                fmt.Printf("Avro Unmarshal error: %v\n", err)
44                continue
45            }
46            fmt.Printf("User ID: %d, Name: %s\n", user.ID, user.Name)
47        } else {
48            fmt.Printf("Kafka error: %s\n", err)
49            break
50        }
51    }
52
53    c.Close()
54}
55
56type User struct {
57    ID   int
58    Name string
59}

This code does the following:

  • Defines an Avro schema inline.
  • Initializes a Kafka consumer with a specified topic.
  • Continuously reads messages and unmarshals them using the Avro schema.

Table Summary

FeatureDetails
SerializationAvro
LanguageGo
Libraryconfluent-kafka-go, hamba/avro
Kafka TopicConfigurable
Schema EvolutionSupported by Avro
Error HandlingBasic logging of deserialization and Kafka errors

Advanced Concepts

Schema Registry

In a production environment, managing Avro schemas directly in your application code can become cumbersome. A schema registry, which provides a RESTful interface for storing and retrieving Avro schemas, simplifies this process. Confluent's Schema Registry is a popular choice.

Handling Offsets

Fine control over how and where from messages are read in Kafka can significantly affect the performance of your application. It's important to handle consumer offsets carefully to ensure no data loss during failures.

Conclusion

Consuming Kafka Avro messages in Go involves parsing the Avro schema, setting up a Kafka consumer, and then deserializing the messages into Go datatypes as they are read from the topics. Kafka and Avro form a powerful combination for robust, scalable, and efficient applications, allowing developers to handle complex data structures and large scales of data efficiently.


Course illustration
Course illustration

All Rights Reserved.