Monday, July 27, 2020

Kafka and Spring Boot(Spring Cloud Stream)

0.Intro

We can interact with Kafka in different ways. Default is to use official Kafka client. By using this client we have to create Kafka Producer and Kafka Consumer and after that we're ready to go. I had a post on this topic here. But if we're using Spring Boot we can try much easier option: Spring Cloud Stream. It is more like "high level" way in comparison to native Kafka client. 

For this demo I created in traditional way 2 applications: Kafka Producer and Kafka Consumer. 
Let's start! 


1. pom.xml 

These files are almost the same for these 2 projects. The only difference is artefact name. 



<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.demien</groupId>
<artifactId>spboot-kafka-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spboot-kafka-producer</name>
<description>Demo project for Spring Boot and Kafka</description>

<properties>
<java.version>11</java.version>
<spring-cloud.version>Hoxton.SR6</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>




2. Producer application

This application will be sending messages to Kafka. 

2.1. Entity 

Let's start from entity class which we will be sending to Kafka. I used Lombok to make it more simple: 
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@ToString
public class TestEntity {
private String id;
private String name;
private String description;
}



2.2. Channel

Ok, we have entity for sending, now we have to configure channel for sending: 
package com.demien.spbootkafkaproducer.domain.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface TestMessageChannels {
@Output("myTestChannel")
MessageChannel myTestChannel();
}



- as you can see it's just an interface - Spring will create an implementation for it in runtime. We just have to define the "id" for this channel, which is "myTestChannel" in this particular case. 

2.3. Sender service

Now we can use our channel for message sending: we should enable bindings for it, autowire it and we're ready to go. 

package com.demien.spbootkafkaproducer.domain.service;

import com.demien.spbootkafkaproducer.domain.message.TestMessageChannels;
import com.demien.spbootkafkaproducer.domain.model.TestEntity;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.logging.Logger;

@Service("testProducerService")
@EnableBinding(TestMessageChannels.class)
public class TestProducerService {

private final static Logger log = Logger.getLogger(TestProducerService.class.getName());

private final TestMessageChannels testMessageChannels;

@Autowired
public TestProducerService(TestMessageChannels testMessageChannels) {
this.testMessageChannels = testMessageChannels;
}

public void sendTestMessages() {
for (int i=0; i<10; i++) {
var idString = Integer.toString(i);
var testEntity = new TestEntity(idString, "Name " + idString, "Description " + idString);
log.info("Sending to kafka entitiy: " + testEntity);
final Message<TestEntity> message = MessageBuilder.withPayload(testEntity).build();
testMessageChannels.myTestChannel().send(message);
}
}
} 

Not complicated, right? We just have to wrap our test entity into Message object and send it into channel. 

2.4. Application runner

And now we can run our application, let the Spring to create out service (and autowire channel) and we can use it: 

package com.demien.spbootkafkaproducer;

import com.demien.spbootkafkaproducer.domain.service.TestProducerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class SpbootKafkaProducerApplication {

public static void main(String[] args) {
final var applicationContext = SpringApplication.run(SpbootKafkaProducerApplication.class, args);
new SpbootKafkaProducerApplication().run(applicationContext);
}

public void run(ApplicationContext applicationContext) {
TestProducerService testProducerService = applicationContext.getBean(TestProducerService.class);
testProducerService.sendTestMessages();
}

}

  

2.5. Application Config

Remember, we defined in #2.2 channel with id "myTestChannel"? Now we have to point it to some Kafka topic. And, basically, configure Kaffka as well. This should be done in application config file(application.properties under resources folder)

spring:
application:
name: test-producer
cloud:
stream:
bindings:
myTestChannel:
destination: myTestTopic
kafka:
binder:
zkNodes: localhost
binder:
brokers: localhost


- thus, we will be sending data to some channel with id "myTestChannel" which is linked to Kafka topic "myTestTopic".  



3. Consumer Application

Consumer will be receiving messages from Kafka. To make consumer simpler I omitted entity class: we will be just printing JSONs with our entities received from Kafka. 

3.1. Input Channel

In a similar way: 
package com.demien.spbootkafkaconsumer.domain.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;

public interface TestMessageChannels {
@Input("myTestChannel")
MessageChannel myTestChannel();
} 


3.2 Consumer Service

Also, in a similar way but in addition we have to annotate some method as "Stream Listener":

package com.demien.spbootkafkaconsumer.domain.service;

import com.demien.spbootkafkaconsumer.domain.message.TestMessageChannels;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

import java.util.logging.Logger;

@Service
@EnableBinding(TestMessageChannels.class)
public class TestConsumerService {

private final static Logger log = Logger.getLogger(TestConsumerService.class.getName());

@StreamListener("myTestChannel")
public void consumeTestEntity(byte[] testEntity) {
log.info("Received from kafka: " + new String(testEntity));
}
}

- as you can see, we're defining listener for our channel and it will triggered every time we're sending messages to Kafka topic linked with this channel. Of course, in real life, here we have to convert JSONs into our entity classes. 

3.3. Application runner

It even simpler than it was for Producer: 

package com.demien.spbootkafkaconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpbootKafkaConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(SpbootKafkaConsumerApplication.class, args);
}

}

3.4. Application configuration

And again we have to link out channel with Kafka topic. 

spring:
application:
name: test-producer
cloud:
stream:
bindings:
myTestChannel:
destination: myTestTopic
consumer:
resetOffsets: true
group:
myTestConsumerGroup
kafka:
binder:
zkNodes: localhost
binder:
brokers: localhost




4. Running

First we have to run Consumer: 

java -jar target/spboot-kafka-consumer-0.0.1-SNAPSHOT.jar


And when it was started - Producer: 

java -jar target/spboot-kafka-producer-0.0.1-SNAPSHOT.jar


Producer should print something like this: 

20-07-27 21:49:44.776  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=1, name=Name 1, description=Description 1)

2020-07-27 21:49:44.788  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=2, name=Name 2, description=Description 2)

2020-07-27 21:49:44.789  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=3, name=Name 3, description=Description 3)

2020-07-27 21:49:44.790  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=4, name=Name 4, description=Description 4)

2020-07-27 21:49:44.791  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=5, name=Name 5, description=Description 5)

2020-07-27 21:49:44.793  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=6, name=Name 6, description=Description 6)

2020-07-27 21:49:44.794  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=7, name=Name 7, description=Description 7)

2020-07-27 21:49:44.795  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=8, name=Name 8, description=Description 8)

2020-07-27 21:49:44.796  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=9, name=Name 9, description=Description 9)




And Consumer: 

2020-07-27 21:47:46.351  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"0","name":"Name 0","description":"Description 0"}

2020-07-27 21:47:46.365  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"1","name":"Name 1","description":"Description 1"}

2020-07-27 21:47:46.367  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"2","name":"Name 2","description":"Description 2"}

2020-07-27 21:47:46.367  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"3","name":"Name 3","description":"Description 3"}

2020-07-27 21:47:46.369  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"4","name":"Name 4","description":"Description 4"}

2020-07-27 21:47:46.369  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"5","name":"Name 5","description":"Description 5"}

2020-07-27 21:47:46.370  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"6","name":"Name 6","description":"Description 6"}

2020-07-27 21:47:46.371  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"7","name":"Name 7","description":"Description 7"}

2020-07-27 21:47:46.372  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"8","name":"Name 8","description":"Description 8"}

2020-07-27 21:47:46.372  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"9","name":"Name 9","description":"Description 9"}


It works! :)



5. The end

As you can see, with Spring Boot we can interact with Kafka on much higher level, so it more simple. 
Source code can be downloaded from here



Monday, May 11, 2020

OOP in Python


In this post I'm going to show some basics of OOP in Python.

1. Classes (which are actually objects) and access modifiers

To define class in Python we should unsurprisingly use keyword class:

class User:

To use constructor we should use "init" function:
def __init__(self, name, age):

The first parameter: self is a reference to object instance. Next parameters are actually constructor parameters. Thus we have 2 parameters here and we should define them in instance variable:

self.name = name
self.age = age


But what about visibility of them? public? private? Actually everything is public. We can use some "hacks" for private:
 - we can add leading underscore - it's just a convention that field is private, nothing is technically stopping us for accessing and changing this field
 - we can add 2 leading underscores - this is better option: Python will rename this field by pattern "_className__paramName". So field will not be visible by it's name, but will still be visible by pattern i just described.

Let's try:

class User:
def __init__(self, name, age):
self.name = name # public field
self._name = name # naive private field
self.__name = name # better private field
self.__age = age # field defined as "property" with custom SETTER

def sayHi(self):
print("Hello, I'm {} and I'm {} years old.".format(self.__name, self.__age))



And now let's play with it. Outputs are shown as comments:

joe = User("Joe", 25)
joe.sayHi() # Hello, I'm Joe and I'm 25 years old.

# playing with public field
print(joe.name) # Joe
joe.name = "Joe1"
print(joe.name) # Joe1

# playing with naive private field
print(joe._name) # Joe
joe._name = "Joe2"
print(joe._name) # Joe2


# playing with better private field
#print(joe.__name) # AttributeError: 'User' object has no attribute '__name'
print(joe._User__name)
joe._User__name = "Joe3"
print(joe._User__name) # Joe3



As you can see, everything is actually public but using 2 underscores we have at least some protection.

2. Addons provided by decorators

More advanced things are provided by decorators. For example if we need something like static method which of course don't have instance reference (self), we can do it this way:

@staticmethod
def say(msg):
print("I'm saying:", msg)


Let's try it:
User.say("Yo!") # I'm saying: Yo!


Also we can use additional decorators for having something like getter and setter:


@property
def age(self):
return self.__age

@age.setter
def age(self, newValue):
print("age can not be changed!")

That is more interesting from a "modifiers" point of view: we can not make variable invisible, but at least can protect them from being changed()!

Let's see:

print(joe.age) # 25
joe.age = 55 # age can not be changed!
print(joe.age) # 25

joe._User__age = 66
print(joe.age) # 66


When we tried to use field directly: protection worked fine. But when we tried to access used pattern ..... protection failed: we successfully changed the value.



3. Inheritance

To extend a class we should provide it name in bracers:
class SuperUser(User):


to call some method from patent classes we can use method "super()":

class SuperUser(User):
def __init__(self, name, age, role):
super().__init__(name, age)
self.__role = role

def sayHi(self):
super().sayHi()
print(" and my role is: {}".format(self.__role))

Let's try it:

admin = SuperUser("Sysdba", 44, "IT")
admin.sayHi() # Hello, I'm Sysdba and I'm 44 years old.
# and my role is: IT

With inheritance everything is more or less as expected.

4. The end

Of course, Python is not a fully OOP language but we still use some basics OOP stuff.


Friday, April 17, 2020

Implicits in Scala for dummies

The purpose of this post is to demystify implicits in Scala, at least a little bit :)

1. Implicit parameters. Context bounds.

1.1. Implicit parameters

Implicit parameters - most likely the simplest form of implicits. Let's see how they works:

Let's say we have some function:

private def printValue[T](value: T): Unit = {
  println(value)
}

which we can call this way:

printValue(123)
printValue("first")

And, at some point we decided that it's better to have some modification of data which we want to print, something like:

printValue(transform(123))
printValue(transform("first"))

But what is if we have a lot of such calls ? It may take some time to modify them all! Or may be we just too lazy  :) As a solution we can use implicit parameter here:


trait Transform[T] {
  def transform(value: T): T}

implicit def intTransform: Transform[Int] = _ * 10 + 1
implicit def stringTransform: Transform[String] = _.toUpperCase
private def printValue[T](value: T)(implicit transformer: Transform[T]): Unit = {
  println(transformer.transform(value))
}

// explicit call
printValue(234)(intTransform)
printValue("first")(stringTransform)

// implicit call
printValue(567) printValue("second")

First, we defined 2 transformers for Int and Sring type and marked them as implicit. Second, we added parameter to our "printValue" function and marked it as implicit as well. Now, if we want - we can pass this transformers as a parameter explicitly. But if we don't - they will be passed implicitly - automatically.

Thus, we don't have to change code which is calling this function, but may change the behavior of function.

Execution results:
2341
FIRST

5671
SECOND

1.2 Context bound

Also starting from Scala 2.8 we can simplify this implicit function a little bit:

// Scala 2.8 allows a shorthand syntax for implicit parameters, called Context Bounds.
private def printValue2[T: Transform](value: T): Unit = {
  val transformer = implicitly[Transform[T]]
  println(transformer.transform(value))
}

printValue2(678)
printValue2("third")


Execution results:
6781
THIRD

This is called "context bounds"



2. Implicit type conversion. Pimp my library. View bounds.

2.1. Implicit type conversion


Also implicits can be useful for different type conversions. 

Let's say we have some class and some function which is doing something very important with parameter of this class:

case class StringContainer(value: String) {
  def sayHi(): Unit = println("Hello world!")
}

def printStringContainer(value: StringContainer): Unit = println(s"=== ${value.value.toUpperCase} ===")

Later we created one more class:

case class IntContainer(value: Int)



  and now we're thinking that it would be great to use "printStringContainer" function for "IntContainer" class as well. But it's not possible because parameter for this function is of type "StringContainer". So we should create a converter from IntContainer to StringContainer.

Let's do it and let's mark it "implicit":

implicit def intToString(intContainer: IntContainer): StringContainer = intContainer match {
  case IntContainer(1) => StringContainer("One")
  case IntContainer(2) => StringContainer("Two")
  case _ => StringContainer("I don't know")
}


And now conversion from IntContainer to StringContainer will happen implicitly - we can just use IntContainer as StringContainer:

printStringContainer(IntContainer(1))

val intVal: IntContainer = IntContainer(2)
val stringVal: StringContainer = intVal


2.2. Pimp my library pattern

If you remember from example from above class StringContainer has method "sayHi()":

case class StringContainer(value: String) {
  def sayHi(): Unit = println("Hello world!")
}


Int container  - don't have it:
case class IntContainer(value: Int)

But we have implicit converter from IntContainer to StringContainer:
implicit def intToString(intContainer: IntContainer): StringContainer = intContainer match {
  case IntContainer(1) => StringContainer("One")
  case IntContainer(2) => StringContainer("Two")
  case _ => StringContainer("I don't know")
}

And now we can do really interesting stuff with IntContainer: we can call from variable of this type method which it don't have:

// Pimp My Library//  - we're calling method from StringContainer on IntContainer variable
//     intContainer type will be converted into stringContainerTypeintVal.sayHi()


This is called ''pimp my library" pattern. We "extended" one class by functionality from another class by just implicit conversion function.

2.3 Conversion chains

We can have several conversion like A=>B=>C let's see an example:

case class IntContainer(value: Int)

case class StringContainer(value: String)

case class BooleanContainer(value: Boolean) {
  def yesOrNo(): Unit = if (value) println("Yes") else println("No")
}

implicit def intToString(value: IntContainer): StringContainer = value match {
  case IntContainer(1) => StringContainer("One")
  case IntContainer(2) => StringContainer("Two")
  case _ => StringContainer("I don't know")
}

implicit def stringToBooleanType[T](value: T)(implicit toString: T => StringContainer): BooleanContainer = {
  val stringValue = toString(value)
  BooleanContainer(stringValue != null && stringValue.value != null && stringValue.value != "I don't know")
}


val intVal2: IntContainer = IntContainer(2)
val intVal3: IntContainer = IntContainer(3)

// chain conversion intType to stringType to booleanType
val booleanVal2: BooleanContainer = intVal2val booleanVal3: BooleanContainer = intVal3
// pimp my libraryintVal2.yesOrNo()

2.4 View bounds(type classes)

Sometimes we know that some classes can be converted to another (as in examples from above). This can be also called "view bounds": class can be "viewed"(converted) as (to) another. Let's create an example of class which works for all classes which can be viewed as "StringContainer" class. Such view bounds we can define using: [T <% StringContainer] syntax.

// view boundclass StringContainerUtils[T <% StringContainer] {
  def valueLength(stringContainer: T) = stringContainer.value.length}

val  stringContainerUtils = new StringContainerUtils()
stringContainerUtils.valueLength(intVal)

As you can see, we can pass intVal of IntContainer type as a parameter into function which expect StringContainer type.


3. Type conversion using implicit classes

Actually, implicit classes correlates with implicit conversions from previous chapter.
Such classes should have just one parameter and all variables of this parameter type will have implicitly methods of such class. So, it's just another form of "pimp my library" pattern:


// it should have constructor with exact one parameterimplicit class IntToString(value: Int) {
  def intToString(): String = value match {
    case 1 => "One"    case 2 => "Two"    case _ => "I don't know"  }
}

println(1.intToString())
println(2.intToString())
println(3.intToString())

- we defined such class for Int type, so now all variables/values of Int type can use methods of this class.

We can of course have more complicated constructions like implicit classes with implicit methods:

sealed trait MyContainercase class IntContainer(value: Int) extends MyContainer
case class StringContainer(value: String) extends MyContainer
trait Info[T <: MyContainer] {
  def showInfo(): String}

implicit val intTypeInfo: Info[IntContainer] = () => "This is IntType"
implicit val stringTypeInfo: Info[StringContainer] = () => "This is StringType"
// we still should have constructor parameter even if we don't use it explicitly
implicit class MyTypeTools[T <: MyContainer](value: T) {
  def getInfo(implicit info: Info[T]): String = info.showInfo()
}

val intVal = IntContainer(1)
println(intVal.getInfo)

val stringVal = StringContainer("one")
println(stringVal.getInfo)

4. The end

As you can implicits are really great and can make coding much more exciting :)