Using Kafka with Spring Boot.

Ruminder Singh
3 min readMay 9, 2021

--

Using kafka in Spring Boot is very easy. Spring Boot provide kafka dependency to use different classes to produce and consume methods to the kafka. In this post I will only be covering basics methods to start you producing and consuming the topics.

I have explained different ways we can achieve the production and consumption of messages. The code can be found here. Follow the instructions written in readme file to run.

Project creation

Create spring boot project using spring initializer and add dependency of web and kafka. You can add it later in pom.xml.

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

Kafka Producer

To send data we will need a kafka producer. There are different ways to create Kafka Producer either using the properties file or by using ProducerFactory.

1. Using properties file

There are 3 properties that are must to create producers. Kafka brokers url(Can be more than one), serializer for key and value. You can use application.properties file or yml file.

spring:
kafka:
producer:
bootstrap-servers:
localhost:9092
key.serializer:
org.apache.kakfa.common.serialization.StringSerializer
value.serializer:
org.apache.kafka.common.serialization.StringSerializer

Here our keys and values will be using String serializers concluding topics key and values will be of type String.

2. Using Kafka Producer Factory and Template

Kafka spring boot dependency provides different classes to create the kafka producer to connect to the kafka brokers and send event/message. First you will need to create Producer Factory object passing configuration like url of brokers, how to serialize the keys, values, batch size and others.

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}

Producing Message

After creating the ProducerFactory, we will pass it to KafkaTemplate, which have method to send the message and other. If you used properties file, you will just need to write Kafka Template to produce message to Kafka.

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

We can use kafkaTemplate.send(key, value) to send the topics to the kafka brokers.

Kafka Consumer

Similarly, Kafka Consumer can be created using properties file or either using Java class files

1. Using Properties File

spring:
kafka:
producer:
bootstrap-servers:
localhost:9092
key.deserializer:
org.apache.kakfa.common.serialization.StringSerializer
value.deserializer:
org.apache.kafka.common.serialization.StringSerializer

2. Using Kafka Consumer Factory and Template

Kafka consumer configuration is more complex as compared to the producer. You need to setup the listener to listen to the events every time, consumer group id and other configurations.

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CONFIG,
StringSerializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CONFIG,
StringSerializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

The consumer factory have been created. Now we need to create KafkaListenereFactory which will create container where listeners can be added.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListener() {
ConcurrentKafkaListenerContainerFactory<String, String>
listenerContainerFact=
newConcurrentKafkaListenerContainerFactory<>();

listenerContainerFactory.setConsumerFactory(consumerFactory());
return listenerContainerFactory;
}

Now once created we need to add Kafka listeners which will listen to the messages in kafka for specific topics..

Consuming Message

If you are using properties file you will need only KafkaListener to consumer the method else you will need to write consumer factory and listener container factory. If you will be consuming only one topic, go with the properties file else with the factory methods.

@KafkaListener(topics="users", groupId = "group_id")
public void consume(String message) throws IOException {
log.info("Message received is " + message);
}

We have registered kafka listener for topics “users”. If you want to listen to other topics with same key and value types (Here <String, String>), you need to just changes the topics name inside Kafka Listener, else you will need to create new ConsumerFactory and pass it to new ConcurrentKafkaListenerContainerFactory method.

We have added group_id to the kafklistener, if you want 2 methods listening to the messages you will need to change group_id, if you kept same only one method will receive the message at a time.

In my project I have created producer using properties file and consumer using spring kafka classes. You can add producer and consumer to same project. There are different configuration you can set for producer like acks, linger, request timeout, compression type. I hope you will a start for using kafka in spring boot. For any help drop me a message. Thanks

--

--

Ruminder Singh
Ruminder Singh

No responses yet