0.Intro
We can interact with Kafka in different ways. Default is to use official Kafka client. By using this client we have to create Kafka Producer and Kafka Consumer and after that we're ready to go. I had a post on this topic
here. But if we're using Spring Boot we can try much easier option: Spring Cloud Stream. It is more like "high level" way in comparison to native Kafka client.
For this demo I created in traditional way 2 applications: Kafka Producer and Kafka Consumer.
Let's start!
1. pom.xml
These files are almost the same for these 2 projects. The only difference is artefact name.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.demien</groupId>
<artifactId>spboot-kafka-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spboot-kafka-producer</name>
<description>Demo project for Spring Boot and Kafka</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>Hoxton.SR6</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2. Producer application
This application will be sending messages to Kafka.
2.1. Entity
Let's start from entity class which we will be sending to Kafka. I used Lombok to make it more simple:
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
public class TestEntity {
private String id;
private String name;
private String description;
}
2.2. Channel
Ok, we have entity for sending, now we have to configure channel for sending:
package com.demien.spbootkafkaproducer.domain.message;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface TestMessageChannels {
@Output("myTestChannel")
MessageChannel myTestChannel();
}
- as you can see it's just an interface - Spring will create an implementation for it in runtime. We just have to define the "id" for this channel, which is "myTestChannel" in this particular case.
2.3. Sender service
Now we can use our channel for message sending: we should enable bindings for it, autowire it and we're ready to go.
package com.demien.spbootkafkaproducer.domain.service;
import com.demien.spbootkafkaproducer.domain.message.TestMessageChannels;
import com.demien.spbootkafkaproducer.domain.model.TestEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.logging.Logger;
@Service("testProducerService")
@EnableBinding(TestMessageChannels.class)
public class TestProducerService {
private final static Logger log = Logger.getLogger(TestProducerService.class.getName());
private final TestMessageChannels testMessageChannels;
@Autowired
public TestProducerService(TestMessageChannels testMessageChannels) {
this.testMessageChannels = testMessageChannels;
}
public void sendTestMessages() {
for (int i=0; i<10; i++) {
var idString = Integer.toString(i);
var testEntity = new TestEntity(idString, "Name " + idString, "Description " + idString);
log.info("Sending to kafka entitiy: " + testEntity);
final Message<TestEntity> message = MessageBuilder.withPayload(testEntity).build();
testMessageChannels.myTestChannel().send(message);
}
}
}
Not complicated, right? We just have to wrap our test entity into Message object and send it into channel.
2.4. Application runner
And now we can run our application, let the Spring to create out service (and autowire channel) and we can use it:
package com.demien.spbootkafkaproducer;
import com.demien.spbootkafkaproducer.domain.service.TestProducerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class SpbootKafkaProducerApplication {
public static void main(String[] args) {
final var applicationContext = SpringApplication.run(SpbootKafkaProducerApplication.class, args);
new SpbootKafkaProducerApplication().run(applicationContext);
}
public void run(ApplicationContext applicationContext) {
TestProducerService testProducerService = applicationContext.getBean(TestProducerService.class);
testProducerService.sendTestMessages();
}
}
2.5. Application Config
Remember, we defined in #2.2 channel with id "myTestChannel"? Now we have to point it to some Kafka topic. And, basically, configure Kaffka as well. This should be done in application config file(application.properties under resources folder)
spring:
application:
name: test-producer
cloud:
stream:
bindings:
myTestChannel:
destination: myTestTopic
kafka:
binder:
zkNodes: localhost
binder:
brokers: localhost
- thus, we will be sending data to some channel with id "myTestChannel" which is linked to Kafka topic "myTestTopic".
3. Consumer Application
Consumer will be receiving messages from Kafka. To make consumer simpler I omitted entity class: we will be just printing JSONs with our entities received from Kafka.
3.1. Input Channel
In a similar way:
package com.demien.spbootkafkaconsumer.domain.message;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;
public interface TestMessageChannels {
@Input("myTestChannel")
MessageChannel myTestChannel();
}
3.2 Consumer Service
Also, in a similar way but in addition we have to annotate some method as "Stream Listener":
package com.demien.spbootkafkaconsumer.domain.service;
import com.demien.spbootkafkaconsumer.domain.message.TestMessageChannels;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
import java.util.logging.Logger;
@Service
@EnableBinding(TestMessageChannels.class)
public class TestConsumerService {
private final static Logger log = Logger.getLogger(TestConsumerService.class.getName());
@StreamListener("myTestChannel")
public void consumeTestEntity(byte[] testEntity) {
log.info("Received from kafka: " + new String(testEntity));
}
}
- as you can see, we're defining listener for our channel and it will triggered every time we're sending messages to Kafka topic linked with this channel. Of course, in real life, here we have to convert JSONs into our entity classes.
3.3. Application runner
It even simpler than it was for Producer:
package com.demien.spbootkafkaconsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpbootKafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpbootKafkaConsumerApplication.class, args);
}
}
3.4. Application configuration
And again we have to link out channel with Kafka topic.
spring:
application:
name: test-producer
cloud:
stream:
bindings:
myTestChannel:
destination: myTestTopic
consumer:
resetOffsets: true
group:
myTestConsumerGroup
kafka:
binder:
zkNodes: localhost
binder:
brokers: localhost
4. Running
First we have to run Consumer:
java -jar target/spboot-kafka-consumer-0.0.1-SNAPSHOT.jar
And when it was started - Producer:
java -jar target/spboot-kafka-producer-0.0.1-SNAPSHOT.jar
Producer should print something like this:
20-07-27 21:49:44.776 INFO 14016 --- [ main] c.d.s.d.service.TestProducerService : Sending to kafka entitiy: TestEntity(id=1, name=Name 1, description=Description 1)
2020-07-27 21:49:44.788 INFO 14016 --- [ main] c.d.s.d.service.TestProducerService : Sending to kafka entitiy: TestEntity(id=2, name=Name 2, description=Description 2)
2020-07-27 21:49:44.789 INFO 14016 --- [ main] c.d.s.d.service.TestProducerService : Sending to kafka entitiy: TestEntity(id=3, name=Name 3, description=Description 3)
2020-07-27 21:49:44.790 INFO 14016 --- [ main] c.d.s.d.service.TestProducerService : Sending to kafka entitiy: TestEntity(id=4, name=Name 4, description=Description 4)
2020-07-27 21:49:44.791 INFO 14016 --- [ main] c.d.s.d.service.TestProducerService : Sending to kafka entitiy: TestEntity(id=5, name=Name 5, description=Description 5)
2020-07-27 21:49:44.793 INFO 14016 --- [ main] c.d.s.d.service.TestProducerService : Sending to kafka entitiy: TestEntity(id=6, name=Name 6, description=Description 6)
2020-07-27 21:49:44.794 INFO 14016 --- [ main] c.d.s.d.service.TestProducerService : Sending to kafka entitiy: TestEntity(id=7, name=Name 7, description=Description 7)
2020-07-27 21:49:44.795 INFO 14016 --- [ main] c.d.s.d.service.TestProducerService : Sending to kafka entitiy: TestEntity(id=8, name=Name 8, description=Description 8)
2020-07-27 21:49:44.796 INFO 14016 --- [ main] c.d.s.d.service.TestProducerService : Sending to kafka entitiy: TestEntity(id=9, name=Name 9, description=Description 9)
And Consumer:
2020-07-27 21:47:46.351 INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService : Received from kafka: {"id":"0","name":"Name 0","description":"Description 0"}
2020-07-27 21:47:46.365 INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService : Received from kafka: {"id":"1","name":"Name 1","description":"Description 1"}
2020-07-27 21:47:46.367 INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService : Received from kafka: {"id":"2","name":"Name 2","description":"Description 2"}
2020-07-27 21:47:46.367 INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService : Received from kafka: {"id":"3","name":"Name 3","description":"Description 3"}
2020-07-27 21:47:46.369 INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService : Received from kafka: {"id":"4","name":"Name 4","description":"Description 4"}
2020-07-27 21:47:46.369 INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService : Received from kafka: {"id":"5","name":"Name 5","description":"Description 5"}
2020-07-27 21:47:46.370 INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService : Received from kafka: {"id":"6","name":"Name 6","description":"Description 6"}
2020-07-27 21:47:46.371 INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService : Received from kafka: {"id":"7","name":"Name 7","description":"Description 7"}
2020-07-27 21:47:46.372 INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService : Received from kafka: {"id":"8","name":"Name 8","description":"Description 8"}
2020-07-27 21:47:46.372 INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService : Received from kafka: {"id":"9","name":"Name 9","description":"Description 9"}
It works! :)
5. The end
As you can see, with Spring Boot we can interact with Kafka on much higher level, so it more simple.
Source code can be downloaded from
here.
No comments:
Post a Comment