Monday, July 27, 2020

Kafka and Spring Boot(Spring Cloud Stream)

0.Intro

We can interact with Kafka in different ways. Default is to use official Kafka client. By using this client we have to create Kafka Producer and Kafka Consumer and after that we're ready to go. I had a post on this topic here. But if we're using Spring Boot we can try much easier option: Spring Cloud Stream. It is more like "high level" way in comparison to native Kafka client. 

For this demo I created in traditional way 2 applications: Kafka Producer and Kafka Consumer. 
Let's start! 


1. pom.xml 

These files are almost the same for these 2 projects. The only difference is artefact name. 



<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.demien</groupId>
<artifactId>spboot-kafka-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spboot-kafka-producer</name>
<description>Demo project for Spring Boot and Kafka</description>

<properties>
<java.version>11</java.version>
<spring-cloud.version>Hoxton.SR6</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>




2. Producer application

This application will be sending messages to Kafka. 

2.1. Entity 

Let's start from entity class which we will be sending to Kafka. I used Lombok to make it more simple: 
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@ToString
public class TestEntity {
private String id;
private String name;
private String description;
}



2.2. Channel

Ok, we have entity for sending, now we have to configure channel for sending: 
package com.demien.spbootkafkaproducer.domain.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface TestMessageChannels {
@Output("myTestChannel")
MessageChannel myTestChannel();
}



- as you can see it's just an interface - Spring will create an implementation for it in runtime. We just have to define the "id" for this channel, which is "myTestChannel" in this particular case. 

2.3. Sender service

Now we can use our channel for message sending: we should enable bindings for it, autowire it and we're ready to go. 

package com.demien.spbootkafkaproducer.domain.service;

import com.demien.spbootkafkaproducer.domain.message.TestMessageChannels;
import com.demien.spbootkafkaproducer.domain.model.TestEntity;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.logging.Logger;

@Service("testProducerService")
@EnableBinding(TestMessageChannels.class)
public class TestProducerService {

private final static Logger log = Logger.getLogger(TestProducerService.class.getName());

private final TestMessageChannels testMessageChannels;

@Autowired
public TestProducerService(TestMessageChannels testMessageChannels) {
this.testMessageChannels = testMessageChannels;
}

public void sendTestMessages() {
for (int i=0; i<10; i++) {
var idString = Integer.toString(i);
var testEntity = new TestEntity(idString, "Name " + idString, "Description " + idString);
log.info("Sending to kafka entitiy: " + testEntity);
final Message<TestEntity> message = MessageBuilder.withPayload(testEntity).build();
testMessageChannels.myTestChannel().send(message);
}
}
} 

Not complicated, right? We just have to wrap our test entity into Message object and send it into channel. 

2.4. Application runner

And now we can run our application, let the Spring to create out service (and autowire channel) and we can use it: 

package com.demien.spbootkafkaproducer;

import com.demien.spbootkafkaproducer.domain.service.TestProducerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class SpbootKafkaProducerApplication {

public static void main(String[] args) {
final var applicationContext = SpringApplication.run(SpbootKafkaProducerApplication.class, args);
new SpbootKafkaProducerApplication().run(applicationContext);
}

public void run(ApplicationContext applicationContext) {
TestProducerService testProducerService = applicationContext.getBean(TestProducerService.class);
testProducerService.sendTestMessages();
}

}

  

2.5. Application Config

Remember, we defined in #2.2 channel with id "myTestChannel"? Now we have to point it to some Kafka topic. And, basically, configure Kaffka as well. This should be done in application config file(application.properties under resources folder)

spring:
application:
name: test-producer
cloud:
stream:
bindings:
myTestChannel:
destination: myTestTopic
kafka:
binder:
zkNodes: localhost
binder:
brokers: localhost


- thus, we will be sending data to some channel with id "myTestChannel" which is linked to Kafka topic "myTestTopic".  



3. Consumer Application

Consumer will be receiving messages from Kafka. To make consumer simpler I omitted entity class: we will be just printing JSONs with our entities received from Kafka. 

3.1. Input Channel

In a similar way: 
package com.demien.spbootkafkaconsumer.domain.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;

public interface TestMessageChannels {
@Input("myTestChannel")
MessageChannel myTestChannel();
} 


3.2 Consumer Service

Also, in a similar way but in addition we have to annotate some method as "Stream Listener":

package com.demien.spbootkafkaconsumer.domain.service;

import com.demien.spbootkafkaconsumer.domain.message.TestMessageChannels;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

import java.util.logging.Logger;

@Service
@EnableBinding(TestMessageChannels.class)
public class TestConsumerService {

private final static Logger log = Logger.getLogger(TestConsumerService.class.getName());

@StreamListener("myTestChannel")
public void consumeTestEntity(byte[] testEntity) {
log.info("Received from kafka: " + new String(testEntity));
}
}

- as you can see, we're defining listener for our channel and it will triggered every time we're sending messages to Kafka topic linked with this channel. Of course, in real life, here we have to convert JSONs into our entity classes. 

3.3. Application runner

It even simpler than it was for Producer: 

package com.demien.spbootkafkaconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpbootKafkaConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(SpbootKafkaConsumerApplication.class, args);
}

}

3.4. Application configuration

And again we have to link out channel with Kafka topic. 

spring:
application:
name: test-producer
cloud:
stream:
bindings:
myTestChannel:
destination: myTestTopic
consumer:
resetOffsets: true
group:
myTestConsumerGroup
kafka:
binder:
zkNodes: localhost
binder:
brokers: localhost




4. Running

First we have to run Consumer: 

java -jar target/spboot-kafka-consumer-0.0.1-SNAPSHOT.jar


And when it was started - Producer: 

java -jar target/spboot-kafka-producer-0.0.1-SNAPSHOT.jar


Producer should print something like this: 

20-07-27 21:49:44.776  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=1, name=Name 1, description=Description 1)

2020-07-27 21:49:44.788  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=2, name=Name 2, description=Description 2)

2020-07-27 21:49:44.789  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=3, name=Name 3, description=Description 3)

2020-07-27 21:49:44.790  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=4, name=Name 4, description=Description 4)

2020-07-27 21:49:44.791  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=5, name=Name 5, description=Description 5)

2020-07-27 21:49:44.793  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=6, name=Name 6, description=Description 6)

2020-07-27 21:49:44.794  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=7, name=Name 7, description=Description 7)

2020-07-27 21:49:44.795  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=8, name=Name 8, description=Description 8)

2020-07-27 21:49:44.796  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=9, name=Name 9, description=Description 9)




And Consumer: 

2020-07-27 21:47:46.351  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"0","name":"Name 0","description":"Description 0"}

2020-07-27 21:47:46.365  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"1","name":"Name 1","description":"Description 1"}

2020-07-27 21:47:46.367  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"2","name":"Name 2","description":"Description 2"}

2020-07-27 21:47:46.367  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"3","name":"Name 3","description":"Description 3"}

2020-07-27 21:47:46.369  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"4","name":"Name 4","description":"Description 4"}

2020-07-27 21:47:46.369  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"5","name":"Name 5","description":"Description 5"}

2020-07-27 21:47:46.370  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"6","name":"Name 6","description":"Description 6"}

2020-07-27 21:47:46.371  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"7","name":"Name 7","description":"Description 7"}

2020-07-27 21:47:46.372  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"8","name":"Name 8","description":"Description 8"}

2020-07-27 21:47:46.372  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"9","name":"Name 9","description":"Description 9"}


It works! :)



5. The end

As you can see, with Spring Boot we can interact with Kafka on much higher level, so it more simple. 
Source code can be downloaded from here