Wednesday, October 16, 2019

PySpark getting started

In this post I'm going to show first steps for working with next components of PySpark:  RDDs and DataFrames.

To start working with PySpark we have 2 options:
-  python spark-shell from spark distro
-  setup dev env by our own

Let's make a closer look on both of them.

Option 1: Spark-shell

Simplest way to play with pyspark is using python spark-shell. First you have to download Spark from official web page(https://spark.apache.org/downloads.html). Next, unpack it and run "pyspark" from "bin" folder. You should see something like this:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Python version 2.7.15+ (default, Oct  7 2019 17:39:04)
SparkSession available as 'spark'.


And we're ready to go! Let's try some basic operations:


>>> rdd1 = spark.parallelize([('Joe',1980), ('Huan',1978), ('Max', 1985) ])

>>> rdd1.take(1)
[('Joe', 1980)]

>>> rdd1.collect()
[('Joe', 1980), ('Huan', 1978), ('Max', 1985)]

>>> rdd1.collect()[1]
('Huan', 1978)

It's definitely enough for "playing" with PySpark, but for complex applications it's not an option: we need to setup dev env.

Option 2: setup PySpark dev env


Python setup

In my ubuntu I have both pythons 2 and 3 installed, but default is 2. To use python 3 I updated bash profile file:   ~/.bashrc : I added line
alias python=python3

Let's check:
~$ python --version
Python 3.6.8

Great! 

Pip3 

Next, let's install pip3: 
sudo apt-get -y install python3-pip

For me, after installation pip3 was not working, so I had to modify file /usr/bin/pip3 :

#from pip import main
from pip import __main__
if __name__ == '__main__':
#    sys.exit(main())
    sys.exit(__main__._main())

And let's check it: 
~$ pip3 --version
pip 19.2.3 from /home/dmitry/.local/lib/python3.6/site-packages/pip (python 3.6)

It works! 

Pipenv

Now let's install pipenv using just installed pip3:
~$ sudo pip3 install pipenv
Installing collected packages: pipenv
Successfully installed pipenv-2018.11.26

Dev env

And now we're finally ready to go! 

Next let's create a folder for our test application:

mkdir pyspark-test
cd pyspark-test

Now let's create a virtual env using pipenv: 
pyspark-test$ pipenv shell
Creating a virtualenv for this project…
✔ Successfully created virtual environment! 


Next, we can try to add some dependencies:
(pyspark-test) pyspark-test$ pipenv install pyspark
Installing pyspark…
✔ Success! 
Updated Pipfile.lock (1869ad)!
Installing dependencies from Pipfile.lock (1869ad)…

Let's check:
(pyspark-test) pyspark-test$ pip3 freeze
py4j==0.10.7
pyspark==2.4.4

- looks good: all dependencies are in place.

Finally everything is prepared for coding! I created folder "src" and file "main.py" inside with next content:

from pyspark.sql import SparkSession

if __name__ == '__main__':
        spark = SparkSession.builder.appName("MyTestApp").getOrCreate()

        rdd1 = spark.sparkContext.parallelize([('Joe',1980), ('Huan',1978), ('Max'1985) ])
        print("Count: ", rdd1.count())
        print("First: ", rdd1.take(1))




Let's run it now:

(pyspark-test) pyspark-test$ python ./src/main.py 

Count:  3                                                                       
First:  [('Joe', 1980)]

Cool! Our environment is ready for coding! 


RDD

First it's better to read about RDD from official web page: https://spark.apache.org/docs/latest/rdd-programming-guide.html

In shorts:
The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.



In a code provided below I'm showing operations which can performed on RDDs:
- rdd creation: in memory and reading from file
- rdd actions: count, take
- rdd transformations: map, flatMap, filter


from pyspark.sql import SparkSession

def main():
    # init spark
    spark = SparkSession.builder.appName("MyTestApp").getOrCreate()
    sc = spark.sparkContext

    # create rdd "in-memory"
    rdd1 = sc.parallelize([('Joe'1980), ('Huan'1978), ('Max'1985)])
    print("RDD1 Count: ", rdd1.count()) #RDD1 Count:  3 
    print("RDD1 First: ", rdd1.take(1)) #RDD1 First:  [('Joe', 1980)]

    # create rdd by reading from file
    rdd2 = sc.textFile("test/test-data/groups.csv"
    print("RDD2 All: ", rdd2.take(10)) #RDD2 All:  ['101,Admin', '102,Dev', '103,DB']

    # RDD transformation using MAP method
    rdd2formatted = rdd2 \
        .map(lambda line: line.split(",")) \
        .map(lambda arr: (arr[0], arr[1]))
    print("RDD2Formatted All: ", rdd2formatted.take(10)) 
    #RDD2Formatted All:  [('101', 'Admin'), ('102', 'Dev'), ('103', 'DB')]

    # RDD filtering using FILTER method
    rdd2filtered = rdd2formatted.filter(lambda row: int(row[0]) > 101)
    print("RDD2Filtered All: ", rdd2filtered.take(10))
    #RDD2Filtered All:  [('102', 'Dev'), ('103', 'DB')]

    # Flattening RDD
    rdd2FilteredFlat = rdd2filtered.flatMap(lambda row: (row[0], row[1]))
    print("RDD2FilteredFlat All: ", rdd2FilteredFlat.take(10))
    #RDD2FilteredFlat All:  ['102', 'Dev', '103', 'DB']


if __name__ == '__main__':
    main()

p.s. content of "test/test-data/groups.csv" is following:
101,Admin
102,Dev
103,DB



DataFrame

First, I would again suggest to read official documentation: https://spark.apache.org/docs/latest/sql-programming-guide.html.
In shorts, if you know what is RDD -  it's very easy to understand what is DataFrame: it'a a RDD + Schema. Where schema is an information about field names and types. If we have structured data like JSON, CSV - we can just read them using spark and it will take the schema from files(in case of CSV - from header).

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. 


Below I'm showing: how to create dataFrame, some basic operations, how to join them. Most interesting thing here: there are to ways for data transformtion here:
- using Spark API
- using Spark SQL

from pyspark.sql import SparkSession

def main():
    # init spark
    spark = SparkSession.builder.appName("MyTestApp").getOrCreate()
    sc = spark.sparkContext

    # create DataFrame from RDD
    rdd1 = sc.parallelize((
    """{"id": "1001", "name": "Joe",  "depId": 101}""",
    """{"id": "1002", "name": "Huan", "depId": 102}"""
    """{"id": "1003", "name": "Max",  "depId": 103}"""
    ))
    print(rdd1.take(2)) 
    # ['{"id": "1001", "name": "Joe",  "depId": 101}', '{"id": "1002", "name": "Huan", "depId": 102}']    
    df1 = spark.read.json(rdd1)
    df1.show()
    # +-----+----+----+
    # |depId|  id|name|
    # +-----+----+----+
    # |  101|1001| Joe|
    # |  102|1002|Huan|
    # |  103|1003| Max|
    # +-----+----+----+

    # Basic operations using Spark API
    df1.select("name""depId").where("id='1001'").show()
    #+----+-----+
    #|name|depId|
    #+----+-----+
    #| Joe|  101|
    #+----+-----+    

    # Basic operation using Spark SQL
    df1.createOrReplaceTempView("users")
    spark.sql("SELECT concat(name,' has id>1001') as user_name FROM users WHERE id>1001").show()
    #+----------------+
    #|       user_name|
    #+----------------+
    #|Huan has id>1001|
    #| Max has id>1001|
    #+----------------+

    # create DataFrame by reading from file
    df2 = spark.read.option("header",True).csv("test/test-data/depts.csv")
    df2.show()
    #+-----+-------+
    #|depId|depName|
    #+-----+-------+
    #|  101|  Admin|
    #|  102|    Dev|
    #|  103|     DB|
    #+-----+-------+

    # join 2 DataFrames using Spark API
    joined1 = df1.join(df2, df1.depId==df2.depId).drop(df2.depId)
    joined1.show()
    #+-----+----+----+-------+
    #|depId|  id|name|depName|
    #+-----+----+----+-------+
    #|  101|1001| Joe|  Admin|
    #|  102|1002|Huan|    Dev|
    #|  103|1003| Max|     DB|
    #+-----+----+----+-------+    

    # join 2 DataFrames using Spark SQL
    df2.createOrReplaceTempView("deps")
    joined2 = spark.sql("SELECT u.id, u.name, d.depName FROM users u, deps d WHERE u.depId = d.depId")
    joined2.show()
    #+----+----+-------+
    #|  id|name|depName|
    #+----+----+-------+
    #|1001| Joe|  Admin|
    #|1002|Huan|    Dev|
    #|1003| Max|     DB|
    #+----+----+-------+





if __name__ == '__main__':
    main()





The end

And that is basically it. Of course it's impossible to show all RDD and DataFrame stuff in one post: it would be a book - so I just tried to show some very basic stuff for understanding "who is who" :)

Monday, September 23, 2019

Apache Hive getting started


In this post I'm going to explain installation of apache hive and some first steps.

Download

Of course first we have to download everything: hive and hadoop (hive will not work without it). For me, closest locations for download were:

http://ftp.man.poznan.pl/apache/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
http://ftp.man.poznan.pl/apache/hadoop/common/hadoop-3.2.0/hadoop-3.2.0.tar.gz

If you don't have java installed - you should install it as well.

Unpack and env setup

Now we have to unpack downloaded archives somewere. I unpacked them into folders:
/home/dmitry/Develop/hadoop
/home/dmitry/Develop/hive

Next we have to create a folder for hive warehouse:
$ sudo mkdir -p /user/hive/warehouse
$ sudo chmod a+rwx /user/hive
$ sudo chmod a+rwx /user/hive/warehouse


To setup env settings we can create a file hadoop-env.sh: 

#!/bin/sh

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/
export HADOOP_HOME=/home/dmitry/Develop/hadoop
export HIVE_HOME=/home/dmitry/Develop/hive

export PATH=$PATH:$HADOOP_HOME/bin:$HIVE_HOME/bin

Please make sure JAVA_HOME points to proper location. 

To put all these variables into current session, run in terminal:
$source ./hadoop-env.sh

Another option is to add everything(content of file from above) into  $HOME/.bashrc


Checking if everything configured properly

Actually hadoop is not confugured and hdfs daemons are not started but we this is not neede for hive. We can check it this way:

Hadoop test: 
dmitry@dmitry-ThinkPad-X220:~/Develop$ hdfs dfs -ls /home
Found 3 items                                                                                                                         
-rw-r--r--   1 root   root           31 2018-04-09 19:28 /home/.directory                                                            
drwxr-xr-x   - dmitry dmitry       4096 2019-09-06 21:58 /home/dmitry                                                                 
drwx------   - root   root        16384 2019-08-29 21:11 /home/lost+found 

If it shows my files, so it works. 

Hive test:

First we have to init meta-data database:
dmitry@dmitry-ThinkPad-X220:~/Develop$ cd $HIVE_HOME
dmitry@dmitry-ThinkPad-X220:~/Develop/hive$ schematool -initSchema -dbType derby

At the end of execution it should print:
Initialization script completed
schemaTool completed

And now we can run hive CLI and try to create a table:


dmitry@dmitry-ThinkPad-X220:~/Develop$ hive
                   
Hive Session ID = a62b7d5e-1955-483d-973a-f7416626ebf8                                                                              
                                                                                                                                      
Logging initialized using configuration in jar:file:/home/dmitry/Develop/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
hive> 

If everything is ok it should be possible to create a table:
hive> CREATE TABLE TEST(ID INT);
OK
Time taken: 0.233 seconds
hive>

Now we can try to insert new values into just created table:
insert into test values(1);

And checking:
hive> select * from test;
OK
1

Value was returned, but as you can see column header is disabled by default. Let's enable it:
hive>  set hive.cli.print.header=true;
hive> select * from test;
OK
test.id
1

If you always prefer seeing the headers, put the first line in your $HOME/.hiverc file.


Work with databases

By default, hive use "default" database:
hive> show databases;
OK
database_name
default

But can always create our own if its needed:
create database mydb comment 'my test db';

now we have : 

hive> show databases;
OK
database_name
default
mydb
Time taken: 0.027 seconds, Fetched: 2 row(s)

To get the details: 
hive> describe database mydb;
OK
db_name comment location        owner_name      owner_type      parameters
mydb    my test db      file:/user/hive/warehouse/mydb.db       dmitry  USER
Time taken: 0.036 seconds, Fetched: 1 row(s)

To switch database we can use:
hive> use mydb;
OK
Time taken: 0.031 seconds

To check tables:
hive> show tables;
OK
tab_name
Time taken: 0.039 seconds


Also  we can configure hive to print database name in prompt:
hive>  set hive.cli.print.current.db=true;
hive (mydb)> 
- and instead of "hive" we will have "hive (database name)"


Work with tables:  Internal(Managed) Tables

Managed Tables
The tables we have created so far are called managed tables or sometimes called inter-
nal tables, because Hive controls the lifecycle of their data (more or less). As we’ve seen,
Hive stores the data for these tables in a subdirectory under the directory defined by
hive.metastore.warehouse.dir (e.g., /user/hive/warehouse), by default.
When we drop a managed table, Hive deletes
the data in the table.

Syntax of table creation is similar to regular databases:
hive (mydb)> CREATE TABLE IF NOT EXISTS users (
           >   user_name  STRING COMMENT 'Name',
           >   user_roles  ARRAY<STRING> COMMENT 'Roles',
           >   user_address STRUCT<city:STRING, street:STRING, zip:INT> COMMENT 'Address'
           > )
           > COMMENT 'My Table'
           > TBLPROPERTIES ('creator'='Dmitry');


Work with tables:  External Tables

The EXTERNAL keyword tells Hive this table is external and the LOCATION ... clause is
required to tell Hive where it’s located. Because it’s external, Hive does not assume it owns the data. Therefore, dropping the table does not delete the data, although the metadata for the table will be deleted.

Let's say we have a folder:
/user/hive/groups

We can create a csv file here with content(name of file doesn't matter):
1,Sysdba
2,Dev
3,Others

And now we can create an external table:

CREATE EXTERNAL TABLE IF NOT EXISTS groups (
group_id INT,
group_name STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/hive/groups';

Let's check it:
hive (mydb)> select * from groups;
OK
groups.group_id groups.group_name
1       Sysdba
2       Dev
3       Others
Time taken: 0.133 seconds, Fetched: 3 row(s)

To understand "who is who" (internal table or external) we can use command "describe extended":

hive (mydb)> describe extended groups;
OK
col_name        data_type       comment
group_id                int                                         
group_name              string                                      
                 
Detailed Table Information      Table(tableName:groups, dbName:mydb, owner:dmitry, createTime:1568231912, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:group_id, type:int, comment:null), FieldSchema(name:group_name, type:string, comment:null)], location:file:/user/hive/groups, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=,, field.delim=,}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{transient_lastDdlTime=1568231912, bucketing_version=2, totalSize=44, EXTERNAL=TRUE, numFiles=1}, viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE, rewriteEnabled:false, catName:hive, ownerType:USER)
Time taken: 0.096 seconds, Fetched: 4 row(s)

From this output we can see: tableType:EXTERNAL_TABLE


Partitioning

To create partitioned table we just have to add to table definition PARTITIONED BY and list columns. Let's drop previously creater table users and re-create it partitioned by department:

CREATE TABLE IF NOT EXISTS users (
 user_name  STRING COMMENT 'Name',
 user_roles  ARRAY<STRING> COMMENT 'Roles',
user_address STRUCT<city:STRING, street:STRING, zip:INT> COMMENT 'Address'
)
PARTITIONED BY (user_department STRING);


Now let's add some data:
hive (mydb)> insert into users (user_name, user_department) values("joe","AAA");
hive (mydb)> insert into users (user_name, user_department) values("moe","BBB");


And let's now check what we have in file system:
dmitry@dmitry-ThinkPad-X220:/user/hive/warehouse/mydb.db/users$ pwd
/user/hive/warehouse/mydb.db/users
dmitry@dmitry-ThinkPad-X220:/user/hive/warehouse/mydb.db/users$ ll
total 16
drwxr-xr-x 4 dmitry dmitry 4096 wrz 12 21:07  ./
drwxr-xr-x 3 dmitry dmitry 4096 wrz 12 21:04  ../
drwxr-xr-x 2 dmitry dmitry 4096 wrz 12 21:07 'user_department=AAA'/
drwxr-xr-x 2 dmitry dmitry 4096 wrz 12 21:07 'user_department=BBB'/

Values with different department are located in different folders.



Exporting data

To export data from tables we can use next syntax:
hive> INSERT OVERWRITE LOCAL DIRECTORY '/home/dmitry/hive-export' SELECT * FROM GROUPS;

Now we can check output directly from hive:
hive> ! less /home/dmitry/hive-export/000000_0;
1Sysdba
2Dev
3Others


Loading data

To load data we can use next syntax:
LOAD DATA LOCAL INPATH '/home/dmitry/hive-export' OVERWRITE INTO TABLE GROUPS;


The end

And this is basically the end of this post:)


Sunday, June 23, 2019

Reactive web application with SpringBoot

0. Intro

Event-driven asynchronous approach is getting more and more popular.
When we're talking about REACTIVE we means that we should react somehow on something. When it comes to web applications, most often that means: we should react by showing some new information on the page on some event,

With traditional approach web page should poll(send request, receive response) some rest service every time it want to get new portion of data:




With reactive - we don't have to poll, we just have to subscribe to some "event stream".

 


1. SpringBoot reactive web application

Let's now create "reactive" springBoot application.

build.gradle:

plugins {
 id 'org.springframework.boot' version '2.1.5.RELEASE'
 id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'com.demien'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

configurations {
 developmentOnly
 runtimeClasspath {
  extendsFrom developmentOnly
 }
}

repositories {
 mavenCentral()
}

dependencies {
 implementation 'org.springframework.boot:spring-boot-starter-webflux'
 developmentOnly 'org.springframework.boot:spring-boot-devtools'
 testImplementation 'org.springframework.boot:spring-boot-starter-test'
 testImplementation 'io.projectreactor:reactor-test'
}

- we have to add dependency "webflux" here.



Main application runner: 

package com.demien.reactweb;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ReactwebApplication {

 public static void main(String[] args) {
  SpringApplication.run(ReactwebApplication.class, args);
 }

}


- nothing interesting is here, just start of spring boot application.

Reactive rest controller: 

package com.demien.reactweb;

import java.time.Duration;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import reactor.core.publisher.Flux;

@RestController
public class RandomNumberController {

 private final Log log = LogFactory.getLog(RandomNumberController.class);

 @RequestMapping("/random")
 public Flux<Integer> random() {
  return Flux.interval(Duration.ofSeconds(5)).map(i -> {
   this.log.info("iteration:" + i);
   return generateRandomNumber();
  }).log();
 }

 private int generateRandomNumber() {
  return (int) (Math.random() * 1000);
 }

}

-magic is happening here ! First of all, we're returning Flux<Integer> - that means, we're actually returning "stream" of integers. Second - we're using delay of 5 seconds between emitting elements.

2. HTML Web page

Here we have just one button. But than we're pressing it, it subscribes as to events which are coming from rest endpoint "/random" which we created at previous step. Also we're defining the "handler" (stringEventSource.onmessage) - what should we do when new event arrives: we're just adding one more list item.


<html>

<head>

    <script>
        function registerEventSourceAndAddResponseTo(uri, elementId) {
            var stringEvents = document.getElementById(elementId);
            var stringEventSource = new EventSource(uri);
            stringEventSource.onmessage = function (e) {
                var newElement = document.createElement("li");
                newElement.innerHTML = e.data;
                stringEvents.appendChild(newElement);
            }
        }

        function subscribe() {
            registerEventSourceAndAddResponseTo("http://localhost:8080/random","display");
        }

    </script>

</head>

<body>

    <p>
        <button id="subscribe-button" onclick="subscribe()">Subscribe to random numbers</button>
        <ul id="display"></ul>
    </p>


</body>

</html>

3. Let's run it! 

I'm running my springBoot application and opening the web page:


Nothing happen so far. We have to press the button for being subscribed to new event. Let's press it:



And now it's much better! Events are coming from server side and UI shows them!

4. The end. 

Full source code can be downloaded from here.

Wednesday, June 12, 2019

Microservices with Spring Boot 2

0. Intro

This is some kind of refreshment of one of my previous posts: "Microservices with spring boot"
updates:
- codebase migrated to version of spring boot: 2.1.4
- used"feign client" instead of "rest template" for micro-services interaction
- used "zipkin" and "sleuth.sampler" for application monitoring

I will not be providing a lot of details - they are in my previous post. DRY - don't repeat yourself :)

1. Architecture

Actually, architecture is the same as in previous post. Client is communicating with 3 micro-services:





But apart from them, we also have some "infrastructure micro-services":



Next picture is taken from zipkin - to show micro-services communication flow: we're calling edge-server and providing it with details of server we actually want to call. Below, we're calling from edge-server - cart-service(method cart/test), which calling through edge-server  user-service  2 times (user/login and user/byToken)  and again through edge-server -  item-service (item/getAll):


2. Discovery server 

The same stuff as in my previous post - I'm using Eureka:



3. Micro-services interaction

As I mentioned at the beginning, I'm using "feign client" for micro-services interaction. I'll show it on example of "cart-service" - it should call "user-service" to get user details and also it should get some items details from "item-service".

3.1. Properties

file application.properties:

spring.application.name=cart-service
server.port=8100
eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka

- here we're defining "coordinates" of our discovery service to get information about services we may need (user and item services)

file bootstrap.properties:

spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.probability=1

- here we're defining "coordinates" of zipkin application - we 'll be sending there requests traces.

3.2 Feign clients 

In feign clients we're defining our "edge-server". Also we're defining "ribbon-client" for service we want to call - it can be different instances of one service, so ribbon client needed for load-balancing. And finally in methods, we're defining the exact rest-services we want to call.

Item service feign client: 

package com.demien.sprcloud.cartservice.controller;
import java.util.List;

import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import com.demien.sprcloud.cartservice.domain.Item;

@FeignClient(contextId = "itemClient", name = "edge-server")
@RibbonClient(name = "item-service")
public interface ItemServiceProxy {

 @RequestMapping(value = "/item-service/item/{itemId}", method = RequestMethod.GET)
 public Item getById(@PathVariable("itemId") String itemId);

 @RequestMapping(value = "/item-service/item/getAll", method = RequestMethod.GET)
 public List<Item> getAll();

}



User service feign client: 

package com.demien.sprcloud.cartservice.controller;

import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient(contextId = "userClient", name = "edge-server")
@RibbonClient(name = "user-service")
public interface UserServiceProxy {

 @RequestMapping(value = "/user-service/user/login", method = RequestMethod.POST)
 public String login(@RequestParam("userId") String userId, @RequestParam("userPassword") String userPassword);

 @RequestMapping(value = "/user-service/user/byToken/{tokenId}", method = RequestMethod.GET)
 public String userByToken(@PathVariable("tokenId") String tokenId);

}

3.3. Test method with interaction 

Next method is just for emulation of some process where user is logging in, adding some items into cart: our cart-service will be communicating with item and user services. Now we can "autowire" our feign clients - and just call them! Lines with calls - are in bold.

 @Autowired
 private UserServiceProxy userServiceProxy;

 @Autowired
 private ItemServiceProxy itemServiceProxy;

 public String getDefaultResponse() {
  return "Something is wrong. Please try again later";

 }

 @HystrixCommand(fallbackMethod = "getDefaultResponse")
 @RequestMapping(method = RequestMethod.GET, value = "/test")
 public String test() {
  final StringBuilder result = new StringBuilder();
  result.append("Logging in into userService as user1/pasword <br/> ");
  final String tokenId = this.userServiceProxy.login("user1", "password1");
  result.append("Received Token: " + tokenId + "<br/><br/>");
  result.append("Getting user details from userService by token <br/>");
  final String userDetails = this.userServiceProxy.userByToken(tokenId);
  result.append("Reseived UserDetails: " + userDetails + "<br/><br/>");

  result.append("Getting item list from itmService <br/>");
  final List<Item> items = this.itemServiceProxy.getAll();
  result.append("Reseived items: <br/>");
  items.forEach(item -> result.append("    " + item.toString() + " <br/>"));

  return result.toString();
 }

Now to test it we can open in browser URL: http://localhost:8765/cart-service/cart/test
and result should be:


Logging in into userService as user1/pasword 
Received Token: ec1a5d78-a8c5-4392-90bb-cbca7d8c9244

Getting user details from userService by token
Reseived UserDetails: {"id":"user1","name":"First User","address":"First Address"}

Getting item list from itmService
Reseived items:
Item(itemId=I6S, itemName=IphoNovatekne 6s, price=400.00)
Item(itemId=I7, itemName=Iphone 7, price=500.00)
Item(itemId=N5, itemName=Samsung galaxy note 5, price=450.00) 



- so we successfully called 2 micro-services!


4. Zipkin 

Zipkin lives here. It's a distributing trace system. To use it I just downloaded JAR file and started it by "java -jar zipkinFileName.jar"

My micro-services are already configured for using trace information to zipkin (bootstrap.propeties at #3.1). So when zipking is started it's possible to monitor them:




Now we can drill-down to details and found the picture I shown at the beginning:
I think this picture makes much more sense now - it's a "map" of execution of my test rest service from #3.3.


5. The end. 

As I mentioned at the beginning, a lot of details were omitted, because they are present in my previous post. Full source code can be downloaded from here.

Friday, April 26, 2019

Java memory dump analysis (memory leaks detection)

0. Intro

In his post I'm going to show the monitoring of application using VisualVM and memory dump analysis using Eclipse Memory Analyzer.

1. What is "memory leak"?

When the garbage collector is trying to "clean garbage" it's looking objects in heap which are unreachable (no references to this object) and removing them. So, if for some reason we're creating new object and always keeping references to them - such objects will not be removed from heap by garbage collector, and application will be using more and more memory till OutOfMemory error.

- such types of problems called "memory leaks".

2. Memory leak example application

Let's now create a simple application with memory leak: we will be creating new objects and storing the references to them into list. So, the objects will not be available for garbage collecting(the will be referenced by list elements).

package com.demien.memanaly;
import java.util.ArrayList;import java.util.List;import java.util.UUID;
public class AppMain {

   class User {
      private String name;
      public User(String name) {
         this.name = name;      }

      public String getName() {
         return name;      }
   }

   private List<User> users = new ArrayList<>();
   public void run() throws InterruptedException {
      while (true) {
         users.add(new User(UUID.randomUUID().toString()));         Thread.sleep(5);      }
   }

   public static void main(String[] args) throws InterruptedException {
      System.out.println("Rinning");      new AppMain().run();
   }

}


To make everything more obvious, let's decrease available memory to 10Mb size by VM argumet -Xmx10m:




3. Run and monitor

To understand what is memory leak, we have to remind ourselves how Garbage collector works: it's trying to remove from heap "unused" objects (which are unreachable). When we created the object, used it for some time and stopped used it - it will be removed from heap by garbage collector. But in our case we don't have "unused"(unreachable) objects, because we're keeping links to all our objects in list. So there are no object for GC to collect, and occupied space in heap will be continuously increasing. 

For monitoring heap usage we can use visualVM tool. It can be downloaded from here.
After running, you have to install from Tools/Plugins/Available Plugins : Visual GC. After installation and running application we have to select it on the left side of the screen,  and open connection:



next - just switch into tab "Visual GC" from just installed plugin. It will be showing current state of all heap areas. We're interested at "Old Gen" area: as you can see, it's constantly growing. That means: GC can not remove from heap object's we're creating - something is definitely wrong with our application:





4. Memory analyzing tool(MAT)

At previous step we found that we have memory leak. Now, let's found which objects are being "mistakenly" created in heap. Of course we know that the problem is with "User" class which we're creating, but let's pretend what we don't know that :)

To analyze memory usage I used eclipse memory analyzer tool(MAT), it can be downloaded from here.
We have 2 options:
 - create dump using visualVM("Monitor" tab + "Head Dump" button)
 - connect  MAT directly to our process

I used second option:


After selecting my java process and pressing "Finish" - I selected "memory leaks" on next screen. And MAT showed me the result:
Size: 2.6 MB Classes: 499 Objects: 56.4k Class Loader: 3 Unreachable Objects Histogram



After clicking on largest (blue) area we can open list of objects with outgoing references: after several "drill-downs" we will be able to see next picture:



Of course, it's not a surprise for us: we have a very big ArrayList with objects of type User - that's our main problem.  We have to stop keeping references to new new objects in arrayList and we'll be fine.

5. The end 

Memo leaks are bad! :) The all heap space will be occupied we will have "out of memory error". So we have to try to avoid situations like described above.

Tuesday, March 26, 2019

Spring Boot Data JPA (with REST)

0. Intro 

It's getting more and more easier to create rest services with spring boot : now It's possible to create them with just several annotations! Let's see :)

1. Project structure

I created simple gradle project with some tests:



2.  Dependencies (build.gradle)

We need:
- spring-data-jpa   - to use auto-generated by Spring repositories
- sparing-data-rest - to expose these repositories by REST services
- lombok - to reduce code for entities classes
- h2 - as database

plugins {
   id 'org.springframework.boot' version '2.1.3.RELEASE'   id 'java'}

apply plugin: 'io.spring.dependency-management'
group = 'com.demien'version = '0.0.1-SNAPSHOT'sourceCompatibility = '1.8'
repositories {
   mavenCentral()
}

dependencies {
   implementation 'org.springframework.boot:spring-boot-starter-data-jpa'   implementation 'org.springframework.boot:spring-boot-starter-data-rest'   runtimeOnly 'com.h2database:h2'   compileOnly('org.projectlombok:lombok')
   testImplementation 'org.springframework.boot:spring-boot-starter-test'}


3. Main runner class 

Nothing interesting is here - just spring boot runner:

package com.demien.sprdata;
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplicationpublic class SprdataApplication {

   public static void main(String[] args) {
      final SpringApplication app = new SpringApplication(SprdataApplication.class);      app.run(args);   }

}



4. Domain entities 

I create only 2 entities: "parent" : UserGroup and "child": User.

4.1. UserGroup entity 

Thanks to lombok, it's very simple, we just have to list properties. Also we have to annotate it as "@Entity" and also I'm defining named query "UserGroup.namedQueryByName"| :

package com.demien.sprdata.domain;
import javax.persistence.Entity;import javax.persistence.GeneratedValue;import javax.persistence.GenerationType;import javax.persistence.Id;import javax.persistence.NamedQuery;
import lombok.Getter;import lombok.Setter;
@Entity@Getter@Setterpublic class UserGroup {

   @Id   @GeneratedValue(strategy = GenerationType.AUTO)
   private Long id;   private String name;   private String description;
   public UserGroup() {

   }

}



4.2. User entity

It's little bit more complicated: we have to define relationship with parent: we will be joining by field groupId. And I'm also defining named query.

package com.demien.sprdata.domain;
import javax.persistence.Entity;import javax.persistence.FetchType;import javax.persistence.GeneratedValue;import javax.persistence.GenerationType;import javax.persistence.Id;import javax.persistence.JoinColumn;import javax.persistence.ManyToOne;import javax.persistence.NamedQuery;
import lombok.Getter;import lombok.Setter;
@Getter@Setter@Entity@NamedQuery(name = "User.namedQueryByName", query = "SELECT u FROM User u WHERE u.name = :name ")
public class User {

   @Id   @GeneratedValue(strategy = GenerationType.AUTO)
   private Long id;   @ManyToOne(fetch = FetchType.LAZY)
   @JoinColumn(name = "groupId")
   private UserGroup group;   private String name;   private Integer salary;
   public User() {
   }

}


5. Repositories

The magic is happening here. We just have to define interface ..... and that's it! We don't have to create the implementation - it will be created by Spring!!!

5.1. UserGroup repository 

I made it very simple: we're just extending CrudRepository and adding several methods.
"Crud" means Create, Update and Delete - so all these methods will be available in implementation which will be generated by Spring. And also 2 more methods added: findAll and count;

package com.demien.sprdata.repository;
import org.springframework.data.repository.CrudRepository;
import com.demien.sprdata.domain.UserGroup;
public interface UserGroupRepository extends CrudRepository<UserGroup, Long> {

   Iterable<UserGroup> findAll();
   long count();
}


5.2. UserRepository 

It's more complicated. First of all, we're annotation it with @RepositoryRestResource to expose methods as rest services.  Next - I'm using PagingAndSorting repository, so paging and sorting features will be available. Also I'm adding a lot of methods which will be generated by Spring:
- we can use patter [find | count] By [fieldName].
- we can even use fields of parent entity (UserGroup which is accessed by "group" field in User entity)
- we can define our own queries


package com.demien.sprdata.repository;
import java.util.List;
import org.springframework.data.jpa.repository.Query;import org.springframework.data.repository.PagingAndSortingRepository;import org.springframework.data.repository.query.Param;import org.springframework.data.rest.core.annotation.RepositoryRestResource;
import com.demien.sprdata.domain.User;
//http://localhost:8080/users/@RepositoryRestResource(collectionResourceRel = "users", path = "users")
public interface UserRepository extends PagingAndSortingRepository<User, Long> {
   Iterable<User> findAll();
   long count();
   List<User> findByName(String name);
   Long deleteByName(String name);
   Long countByGroupName(String groupName);
   // find by parent entity : Group   List<User> findByGroupName(String name);
   // defining custom query   @Query("SELECT u FROM User u WHERE u.name LIKE CONCAT('%', :name, '%') ")
   List<User> queryByName(@Param("name") String name);
   // using named query defined in entity class   List<User> namedQueryByName(@Param("name") String name);
   @Query(value = "SELECT * FROM User WHERE name = :name ", nativeQuery = true)
   List<User> nativeQueryByName(@Param("name") String name);
}

6. Testing 

And now let's test how this magic works

6.1 UserGroupRepository -  test

Here I'm testing just CRUD operations:


package com.demien.sprdata;
import static org.junit.Assert.assertEquals;import static org.junit.Assert.assertFalse;import static org.junit.Assert.assertTrue;
import java.util.ArrayList;import java.util.List;import java.util.Optional;
import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager;import org.springframework.test.context.junit4.SpringRunner;
import com.demien.sprdata.domain.UserGroup;import com.demien.sprdata.repository.UserGroupRepository;
@DataJpaTest@RunWith(SpringRunner.class)
public class UserGroupRepositoryTest {

   @Autowired   private UserGroupRepository groupRepository;
   @Autowired   private TestEntityManager em;
   @Test   public void findAllTest() {
      final List<UserGroup> groups = new ArrayList<>();      groupRepository.findAll().forEach(group -> groups.add(group));      assertEquals(4, groups.size());   }

   @Test   public void checkUserGroupCount() {
      assertEquals(4, groupRepository.count());
   }

   @Test   public void findOne() {
      Optional<UserGroup> opGroup = groupRepository.findById(1001L);      assertTrue(opGroup.isPresent());      assertEquals("ADM", opGroup.get().getName());
      groupRepository.deleteById(1001L);      opGroup = groupRepository.findById(1001L);      assertFalse(opGroup.isPresent());
   }

   @Test   public void createNewTest() {
      final UserGroup newGroup = new UserGroup();      newGroup.setDescription("Created");      groupRepository.save(newGroup);      assertTrue(newGroup.getId() != null);
      em.flush();      final Optional<UserGroup> loaded = groupRepository.findById(newGroup.getId());      assertTrue(loaded.isPresent());      assertTrue(loaded.get().getDescription().equals("Created"));
      groupRepository.deleteById(newGroup.getId());
   }

}



6.2. UserRepository - test

And most interesting is happening here: I'm testing paging, sorting, named queries, native queries....

package com.demien.sprdata;
import static org.junit.Assert.assertTrue;
import java.util.List;
import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager;import org.springframework.data.domain.Page;import org.springframework.data.domain.PageRequest;import org.springframework.data.domain.Sort;import org.springframework.test.context.junit4.SpringRunner;
import com.demien.sprdata.domain.User;import com.demien.sprdata.repository.UserRepository;
@DataJpaTest@RunWith(SpringRunner.class)
public class UserRepositoryTest {

   @Autowired   UserRepository userRepository;
   @Autowired   TestEntityManager em;
   @Test   public void sortTest() {
      Sort sort = new Sort(Sort.Direction.ASC, "group_id").and(new Sort(Sort.Direction.DESC, "name"));      Iterable<User> users = userRepository.findAll(sort);      User first = users.iterator().next();      assertTrue(first.getGroup().getId() == 1001L);      assertTrue(first.getName().equals("Victor"));   }

   @Test   public void pagingTest() {
      final PageRequest pageRequest = PageRequest.of(0, 2);      final Page<User> userPage = userRepository.findAll(pageRequest);      assertTrue(userPage.getNumberOfElements() == 2);      assertTrue(userPage.getTotalPages() == 4);
   }

   @Test   public void findTest() {
      List<User> users = userRepository.findByName("Joe");      assertTrue(users.size() == 1);      assertTrue(users.get(0).getName().equals("Joe"));
      Long countByGroupName = userRepository.countByGroupName("ADM");      assertTrue(countByGroupName == 3L);
      users = userRepository.findByGroupName("ADM");      assertTrue(users.size() == 3);
      users = userRepository.queryByName("a");      assertTrue(users.size() == 5);      assertTrue(users.get(0).getName().contains("a"));
      users = userRepository.namedQueryByName("Charles");      assertTrue(users.size() == 1);      assertTrue(users.get(0).getId() == 104L);
      users = userRepository.nativeQueryByName("Mario");      assertTrue(users.size() == 1);      assertTrue(users.get(0).getId() == 105L);
   }

}


7. Rest services

And final thing: let's now run our application runner and open in browser: http://localhost:8080/users/

It should be something like :


{
  "_embedded" : {
    "users" : [ {
      "name" : "Joe",
      "salary" : 100,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/users/101"
        },
        "user" : {
          "href" : "http://localhost:8080/users/101"
        },
        "group" : {
          "href" : "http://localhost:8080/users/101/group"
        }
      }
    }, {
      "name" : "Huan",
      "salary" : 200,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/users/102"
        },
        "user" : {
          "href" : "http://localhost:8080/users/102"
        },
        "group" : {
          "href" : "http://localhost:8080/users/102/group"
        }
      }
    }, {
      "name" : "Michael",
      "salary" : 300,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/users/103"
        },
        "user" : {
          "href" : "http://localhost:8080/users/103"
        },
        "group" : {
          "href" : "http://localhost:8080/users/103/group"
        }
      }
    }, {
      "name" : "Charles",
      "salary" : 100,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/users/104"
        },
        "user" : {
          "href" : "http://localhost:8080/users/104"
        },
        "group" : {
          "href" : "http://localhost:8080/users/104/group"
        }
      }
    }, {
      "name" : "Mario",
      "salary" : 200,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/users/105"
        },
        "user" : {
          "href" : "http://localhost:8080/users/105"
        },
        "group" : {
          "href" : "http://localhost:8080/users/105/group"
        }
      }
    }, {
      "name" : "Jan",
      "salary" : 300,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/users/106"
        },
        "user" : {
          "href" : "http://localhost:8080/users/106"
        },
        "group" : {
          "href" : "http://localhost:8080/users/106/group"
        }
      }
    }, {
      "name" : "Victor",
      "salary" : 500,
      "_links" : {
        "self" : {
          "href" : "http://localhost:8080/users/107"
        },
        "user" : {
          "href" : "http://localhost:8080/users/107"
        },
        "group" : {
          "href" : "http://localhost:8080/users/107/group"
        }
      }
    } ]
  },
  "_links" : {
    "self" : {
      "href" : "http://localhost:8080/users{?page,size,sort}",
      "templated" : true
    },
    "profile" : {
      "href" : "http://localhost:8080/profile/users"
    },
    "search" : {
      "href" : "http://localhost:8080/users/search"
    }
  },
  "page" : {
    "size" : 20,
    "totalElements" : 7,
    "totalPages" : 1,
    "number" : 0
  }
}

- HATEOAS is in place  !

8. The end

As I mentioned at the beginning, with spring-boot stack we can create rest services with DB repositories by just adding few annotations! Full source code can be downloaded from here