Kafka messages using Protobuf in SpringBoot
If you are looking to produce and consume kafka events with google protobuf then you are on the right place.
Protocol Buffers is a free and open-source cross-platform data format used to serialize structured data. It is useful in developing programs to communicate with each other over a network or for storing data.
To understand more about protobuf please go through the
Lets create the producer configuration to produce kafka events with protobuf. To produce the protobuf you need the schema registery configured.
You have to create a .proto file which will generate the SimpleMessage for you.
Here is the SimpleMessage.proto which used for producing and consuming the Kafka Events:
syntax = "proto3";
package com.aws.protobuf;
option java_outer_classname = "SimpleMessageProtos";
message SimpleMessage {
string content = 1;
string date_time = 2;
}
Here is the producer configuration :
@Bean
public Producer<String, SimpleMessageProtos.SimpleMessage> producerFactory() {
Properties props = new Properties();
props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, SimpleMessageProtos.SimpleMessage> producer = new KafkaProducer<String, SimpleMessageProtos.SimpleMessage>(props);
return producer;
}
Now you should be able to produce the protobuf events for Kafak.
To consume the protobuf events please create the following :
@Bean
public ConsumerFactory<String, SimpleMessageProtos.SimpleMessage> consumerFactory() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, SimpleMessageProtos.SimpleMessage.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}
You are good to go to produce and consume the protobuf messages using the Kafka.