Kafka messages using Protobuf in SpringBoot

waytohksharma
1 min readJul 31, 2023

--

If you are looking to produce and consume kafka events with google protobuf then you are on the right place.

Protocol Buffers is a free and open-source cross-platform data format used to serialize structured data. It is useful in developing programs to communicate with each other over a network or for storing data.

To understand more about protobuf please go through the

https://protobuf.dev/

Lets create the producer configuration to produce kafka events with protobuf. To produce the protobuf you need the schema registery configured.

You have to create a .proto file which will generate the SimpleMessage for you.

Here is the SimpleMessage.proto which used for producing and consuming the Kafka Events:

syntax = "proto3";

package com.aws.protobuf;
option java_outer_classname = "SimpleMessageProtos";

message SimpleMessage {
string content = 1;
string date_time = 2;
}

Here is the producer configuration :

 @Bean
public Producer<String, SimpleMessageProtos.SimpleMessage> producerFactory() {
Properties props = new Properties();
props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, SimpleMessageProtos.SimpleMessage> producer = new KafkaProducer<String, SimpleMessageProtos.SimpleMessage>(props);
return producer;
}

Now you should be able to produce the protobuf events for Kafak.

To consume the protobuf events please create the following :

 @Bean
public ConsumerFactory<String, SimpleMessageProtos.SimpleMessage> consumerFactory() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, SimpleMessageProtos.SimpleMessage.class.getName());

return new DefaultKafkaConsumerFactory<>(props);
}

You are good to go to produce and consume the protobuf messages using the Kafka.

--

--

waytohksharma

Cloud Architect with 16+ years of experience. I enjoy doing PoC about emerging technologies, staying up-to-date on tech. Play with AWS, Java, API, Microservice.