Showing posts with label spring. Show all posts
Showing posts with label spring. Show all posts

Monday, July 27, 2020

Kafka and Spring Boot(Spring Cloud Stream)

0.Intro

We can interact with Kafka in different ways. Default is to use official Kafka client. By using this client we have to create Kafka Producer and Kafka Consumer and after that we're ready to go. I had a post on this topic here. But if we're using Spring Boot we can try much easier option: Spring Cloud Stream. It is more like "high level" way in comparison to native Kafka client. 

For this demo I created in traditional way 2 applications: Kafka Producer and Kafka Consumer. 
Let's start! 


1. pom.xml 

These files are almost the same for these 2 projects. The only difference is artefact name. 



<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.demien</groupId>
<artifactId>spboot-kafka-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spboot-kafka-producer</name>
<description>Demo project for Spring Boot and Kafka</description>

<properties>
<java.version>11</java.version>
<spring-cloud.version>Hoxton.SR6</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>




2. Producer application

This application will be sending messages to Kafka. 

2.1. Entity 

Let's start from entity class which we will be sending to Kafka. I used Lombok to make it more simple: 
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@ToString
public class TestEntity {
private String id;
private String name;
private String description;
}



2.2. Channel

Ok, we have entity for sending, now we have to configure channel for sending: 
package com.demien.spbootkafkaproducer.domain.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface TestMessageChannels {
@Output("myTestChannel")
MessageChannel myTestChannel();
}



- as you can see it's just an interface - Spring will create an implementation for it in runtime. We just have to define the "id" for this channel, which is "myTestChannel" in this particular case. 

2.3. Sender service

Now we can use our channel for message sending: we should enable bindings for it, autowire it and we're ready to go. 

package com.demien.spbootkafkaproducer.domain.service;

import com.demien.spbootkafkaproducer.domain.message.TestMessageChannels;
import com.demien.spbootkafkaproducer.domain.model.TestEntity;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.logging.Logger;

@Service("testProducerService")
@EnableBinding(TestMessageChannels.class)
public class TestProducerService {

private final static Logger log = Logger.getLogger(TestProducerService.class.getName());

private final TestMessageChannels testMessageChannels;

@Autowired
public TestProducerService(TestMessageChannels testMessageChannels) {
this.testMessageChannels = testMessageChannels;
}

public void sendTestMessages() {
for (int i=0; i<10; i++) {
var idString = Integer.toString(i);
var testEntity = new TestEntity(idString, "Name " + idString, "Description " + idString);
log.info("Sending to kafka entitiy: " + testEntity);
final Message<TestEntity> message = MessageBuilder.withPayload(testEntity).build();
testMessageChannels.myTestChannel().send(message);
}
}
} 

Not complicated, right? We just have to wrap our test entity into Message object and send it into channel. 

2.4. Application runner

And now we can run our application, let the Spring to create out service (and autowire channel) and we can use it: 

package com.demien.spbootkafkaproducer;

import com.demien.spbootkafkaproducer.domain.service.TestProducerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class SpbootKafkaProducerApplication {

public static void main(String[] args) {
final var applicationContext = SpringApplication.run(SpbootKafkaProducerApplication.class, args);
new SpbootKafkaProducerApplication().run(applicationContext);
}

public void run(ApplicationContext applicationContext) {
TestProducerService testProducerService = applicationContext.getBean(TestProducerService.class);
testProducerService.sendTestMessages();
}

}

  

2.5. Application Config

Remember, we defined in #2.2 channel with id "myTestChannel"? Now we have to point it to some Kafka topic. And, basically, configure Kaffka as well. This should be done in application config file(application.properties under resources folder)

spring:
application:
name: test-producer
cloud:
stream:
bindings:
myTestChannel:
destination: myTestTopic
kafka:
binder:
zkNodes: localhost
binder:
brokers: localhost


- thus, we will be sending data to some channel with id "myTestChannel" which is linked to Kafka topic "myTestTopic".  



3. Consumer Application

Consumer will be receiving messages from Kafka. To make consumer simpler I omitted entity class: we will be just printing JSONs with our entities received from Kafka. 

3.1. Input Channel

In a similar way: 
package com.demien.spbootkafkaconsumer.domain.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;

public interface TestMessageChannels {
@Input("myTestChannel")
MessageChannel myTestChannel();
} 


3.2 Consumer Service

Also, in a similar way but in addition we have to annotate some method as "Stream Listener":

package com.demien.spbootkafkaconsumer.domain.service;

import com.demien.spbootkafkaconsumer.domain.message.TestMessageChannels;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

import java.util.logging.Logger;

@Service
@EnableBinding(TestMessageChannels.class)
public class TestConsumerService {

private final static Logger log = Logger.getLogger(TestConsumerService.class.getName());

@StreamListener("myTestChannel")
public void consumeTestEntity(byte[] testEntity) {
log.info("Received from kafka: " + new String(testEntity));
}
}

- as you can see, we're defining listener for our channel and it will triggered every time we're sending messages to Kafka topic linked with this channel. Of course, in real life, here we have to convert JSONs into our entity classes. 

3.3. Application runner

It even simpler than it was for Producer: 

package com.demien.spbootkafkaconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpbootKafkaConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(SpbootKafkaConsumerApplication.class, args);
}

}

3.4. Application configuration

And again we have to link out channel with Kafka topic. 

spring:
application:
name: test-producer
cloud:
stream:
bindings:
myTestChannel:
destination: myTestTopic
consumer:
resetOffsets: true
group:
myTestConsumerGroup
kafka:
binder:
zkNodes: localhost
binder:
brokers: localhost




4. Running

First we have to run Consumer: 

java -jar target/spboot-kafka-consumer-0.0.1-SNAPSHOT.jar


And when it was started - Producer: 

java -jar target/spboot-kafka-producer-0.0.1-SNAPSHOT.jar


Producer should print something like this: 

20-07-27 21:49:44.776  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=1, name=Name 1, description=Description 1)

2020-07-27 21:49:44.788  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=2, name=Name 2, description=Description 2)

2020-07-27 21:49:44.789  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=3, name=Name 3, description=Description 3)

2020-07-27 21:49:44.790  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=4, name=Name 4, description=Description 4)

2020-07-27 21:49:44.791  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=5, name=Name 5, description=Description 5)

2020-07-27 21:49:44.793  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=6, name=Name 6, description=Description 6)

2020-07-27 21:49:44.794  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=7, name=Name 7, description=Description 7)

2020-07-27 21:49:44.795  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=8, name=Name 8, description=Description 8)

2020-07-27 21:49:44.796  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=9, name=Name 9, description=Description 9)




And Consumer: 

2020-07-27 21:47:46.351  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"0","name":"Name 0","description":"Description 0"}

2020-07-27 21:47:46.365  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"1","name":"Name 1","description":"Description 1"}

2020-07-27 21:47:46.367  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"2","name":"Name 2","description":"Description 2"}

2020-07-27 21:47:46.367  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"3","name":"Name 3","description":"Description 3"}

2020-07-27 21:47:46.369  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"4","name":"Name 4","description":"Description 4"}

2020-07-27 21:47:46.369  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"5","name":"Name 5","description":"Description 5"}

2020-07-27 21:47:46.370  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"6","name":"Name 6","description":"Description 6"}

2020-07-27 21:47:46.371  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"7","name":"Name 7","description":"Description 7"}

2020-07-27 21:47:46.372  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"8","name":"Name 8","description":"Description 8"}

2020-07-27 21:47:46.372  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"9","name":"Name 9","description":"Description 9"}


It works! :)



5. The end

As you can see, with Spring Boot we can interact with Kafka on much higher level, so it more simple. 
Source code can be downloaded from here



Sunday, June 23, 2019

Reactive web application with SpringBoot

0. Intro

Event-driven asynchronous approach is getting more and more popular.
When we're talking about REACTIVE we means that we should react somehow on something. When it comes to web applications, most often that means: we should react by showing some new information on the page on some event,

With traditional approach web page should poll(send request, receive response) some rest service every time it want to get new portion of data:




With reactive - we don't have to poll, we just have to subscribe to some "event stream".

 


1. SpringBoot reactive web application

Let's now create "reactive" springBoot application.

build.gradle:

plugins {
 id 'org.springframework.boot' version '2.1.5.RELEASE'
 id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'com.demien'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

configurations {
 developmentOnly
 runtimeClasspath {
  extendsFrom developmentOnly
 }
}

repositories {
 mavenCentral()
}

dependencies {
 implementation 'org.springframework.boot:spring-boot-starter-webflux'
 developmentOnly 'org.springframework.boot:spring-boot-devtools'
 testImplementation 'org.springframework.boot:spring-boot-starter-test'
 testImplementation 'io.projectreactor:reactor-test'
}

- we have to add dependency "webflux" here.



Main application runner: 

package com.demien.reactweb;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ReactwebApplication {

 public static void main(String[] args) {
  SpringApplication.run(ReactwebApplication.class, args);
 }

}


- nothing interesting is here, just start of spring boot application.

Reactive rest controller: 

package com.demien.reactweb;

import java.time.Duration;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;

@RestController
public class RandomNumberController {

 private final Log log = LogFactory.getLog(RandomNumberController.class);

 @RequestMapping("/random")
 public Flux<Integer> random() {
  return Flux.interval(Duration.ofSeconds(5)).map(i -> {
   this.log.info("iteration:" + i);
   return generateRandomNumber();
  }).log();
 }

 private int generateRandomNumber() {
  return (int) (Math.random() * 1000);
 }

}

-magic is happening here ! First of all, we're returning Flux<Integer> - that means, we're actually returning "stream" of integers. Second - we're using delay of 5 seconds between emitting elements.

2. HTML Web page

Here we have just one button. But than we're pressing it, it subscribes as to events which are coming from rest endpoint "/random" which we created at previous step. Also we're defining the "handler" (stringEventSource.onmessage) - what should we do when new event arrives: we're just adding one more list item.


<html>

<head>

    <script>
        function registerEventSourceAndAddResponseTo(uri, elementId) {
            var stringEvents = document.getElementById(elementId);
            var stringEventSource = new EventSource(uri);
            stringEventSource.onmessage = function (e) {
                var newElement = document.createElement("li");
                newElement.innerHTML = e.data;
                stringEvents.appendChild(newElement);
            }
        }

        function subscribe() {
            registerEventSourceAndAddResponseTo("http://localhost:8080/random","display");
        }

    </script>

</head>

<body>

    <p>
        <button id="subscribe-button" onclick="subscribe()">Subscribe to random numbers</button>
        <ul id="display"></ul>
    </p>


</body>

</html>

3. Let's run it! 

I'm running my springBoot application and opening the web page:


Nothing happen so far. We have to press the button for being subscribed to new event. Let's press it:



And now it's much better! Events are coming from server side and UI shows them!

4. The end. 

Full source code can be downloaded from here.

Wednesday, June 12, 2019

Microservices with Spring Boot 2

0. Intro

This is some kind of refreshment of one of my previous posts: "Microservices with spring boot"
updates:
- codebase migrated to version of spring boot: 2.1.4
- used"feign client" instead of "rest template" for micro-services interaction
- used "zipkin" and "sleuth.sampler" for application monitoring

I will not be providing a lot of details - they are in my previous post. DRY - don't repeat yourself :)

1. Architecture

Actually, architecture is the same as in previous post. Client is communicating with 3 micro-services:





But apart from them, we also have some "infrastructure micro-services":



Next picture is taken from zipkin - to show micro-services communication flow: we're calling edge-server and providing it with details of server we actually want to call. Below, we're calling from edge-server - cart-service(method cart/test), which calling through edge-server  user-service  2 times (user/login and user/byToken)  and again through edge-server -  item-service (item/getAll):


2. Discovery server 

The same stuff as in my previous post - I'm using Eureka:



3. Micro-services interaction

As I mentioned at the beginning, I'm using "feign client" for micro-services interaction. I'll show it on example of "cart-service" - it should call "user-service" to get user details and also it should get some items details from "item-service".

3.1. Properties

file application.properties:

spring.application.name=cart-service
server.port=8100
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka

- here we're defining "coordinates" of our discovery service to get information about services we may need (user and item services)

file bootstrap.properties:

spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.probability=1

- here we're defining "coordinates" of zipkin application - we 'll be sending there requests traces.

3.2 Feign clients 

In feign clients we're defining our "edge-server". Also we're defining "ribbon-client" for service we want to call - it can be different instances of one service, so ribbon client needed for load-balancing. And finally in methods, we're defining the exact rest-services we want to call.

Item service feign client: 

package com.demien.sprcloud.cartservice.controller;
import java.util.List;

import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import com.demien.sprcloud.cartservice.domain.Item;

@FeignClient(contextId = "itemClient", name = "edge-server")
@RibbonClient(name = "item-service")
public interface ItemServiceProxy {

 @RequestMapping(value = "/item-service/item/{itemId}", method = RequestMethod.GET)
 public Item getById(@PathVariable("itemId") String itemId);

 @RequestMapping(value = "/item-service/item/getAll", method = RequestMethod.GET)
 public List<Item> getAll();

}



User service feign client: 

package com.demien.sprcloud.cartservice.controller;

import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient(contextId = "userClient", name = "edge-server")
@RibbonClient(name = "user-service")
public interface UserServiceProxy {

 @RequestMapping(value = "/user-service/user/login", method = RequestMethod.POST)
 public String login(@RequestParam("userId") String userId, @RequestParam("userPassword") String userPassword);

 @RequestMapping(value = "/user-service/user/byToken/{tokenId}", method = RequestMethod.GET)
 public String userByToken(@PathVariable("tokenId") String tokenId);

}

3.3. Test method with interaction 

Next method is just for emulation of some process where user is logging in, adding some items into cart: our cart-service will be communicating with item and user services. Now we can "autowire" our feign clients - and just call them! Lines with calls - are in bold.

 @Autowired
 private UserServiceProxy userServiceProxy;

 @Autowired
 private ItemServiceProxy itemServiceProxy;

 public String getDefaultResponse() {
  return "Something is wrong. Please try again later";

 }

 @HystrixCommand(fallbackMethod = "getDefaultResponse")
 @RequestMapping(method = RequestMethod.GET, value = "/test")
 public String test() {
  final StringBuilder result = new StringBuilder();
  result.append("Logging in into userService as user1/pasword <br/> ");
  final String tokenId = this.userServiceProxy.login("user1", "password1");
  result.append("Received Token: " + tokenId + "<br/><br/>");
  result.append("Getting user details from userService by token <br/>");
  final String userDetails = this.userServiceProxy.userByToken(tokenId);
  result.append("Reseived UserDetails: " + userDetails + "<br/><br/>");

  result.append("Getting item list from itmService <br/>");
  final List<Item> items = this.itemServiceProxy.getAll();
  result.append("Reseived items: <br/>");
  items.forEach(item -> result.append("    " + item.toString() + " <br/>"));

  return result.toString();
 }

Now to test it we can open in browser URL: http://localhost:8765/cart-service/cart/test
and result should be:


Logging in into userService as user1/pasword 
Received Token: ec1a5d78-a8c5-4392-90bb-cbca7d8c9244

Getting user details from userService by token
Reseived UserDetails: {"id":"user1","name":"First User","address":"First Address"}

Getting item list from itmService
Reseived items:
Item(itemId=I6S, itemName=IphoNovatekne 6s, price=400.00)
Item(itemId=I7, itemName=Iphone 7, price=500.00)
Item(itemId=N5, itemName=Samsung galaxy note 5, price=450.00) 



- so we successfully called 2 micro-services!


4. Zipkin 

Zipkin lives here. It's a distributing trace system. To use it I just downloaded JAR file and started it by "java -jar zipkinFileName.jar"

My micro-services are already configured for using trace information to zipkin (bootstrap.propeties at #3.1). So when zipking is started it's possible to monitor them:




Now we can drill-down to details and found the picture I shown at the beginning:
I think this picture makes much more sense now - it's a "map" of execution of my test rest service from #3.3.


5. The end. 

As I mentioned at the beginning, a lot of details were omitted, because they are present in my previous post. Full source code can be downloaded from here.