Tuesday, December 11, 2018

Spring: how can I test my bean?

0. Intro

Testing is very important part of development. In this post I want to show most popular options of testing Spring application.

1. Test application

I want to create the simulation of permission checking logic: main idea is to check if the user has access to group. This operation will be performed by PermissionService. But it also has dependency to service UserGroupService which can return list of user groups .

1.1. Project structure

I have two domain classes: User and Group, two interfaces/services: Permission and UserGroup, config class and main runner: App class.



1.2. Gradle build

I'm using lombok to reduce some boilerplate code, so lombok plugin is being applied. Also, and beside spring  dependencies I have mockito for testing.


plugins {
    id 'java'    id 'io.franzbecker.gradle-lombok' version '1.8'    id 'application'}

group 'com.demien'version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
    mavenCentral()
}

dependencies {
    compile group: 'org.springframework', name: 'spring-core', version: '5.1.3.RELEASE'    compile group: 'org.springframework', name: 'spring-context', version: '5.1.3.RELEASE'    compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'

    testCompile group: 'junit', name: 'junit', version: '4.12'    testCompile group: 'org.springframework', name: 'spring-test', version: '5.1.3.RELEASE'    testCompile group: 'org.mockito', name: 'mockito-all', version: '1.10.19'}

mainClassName = 'com.demien.springtest.App'

1.3. Services

For services I'm creating interface and implementation.

UserGroupService should return for provided user list of assigned  groups:

package com.demien.springtest.service;
import com.demien.springtest.domain.Group;import com.demien.springtest.domain.User;import java.util.List;
public interface UserGroupService {

    List<Group> getUserGroups(User user);
}


Of course in real life it should call DB or another service to get this list of groups, but for this example I'm just returning two "hardcoded" groups: "first" and "second". 

package com.demien.springtest.service;
import com.demien.springtest.domain.Group;import com.demien.springtest.domain.User;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.stereotype.Repository;
import java.util.Arrays;import java.util.List;
@Slf4j@Repository@Qualifier("UserGroupService-main")
public class UserGroupServiceImpl implements UserGroupService {

    @Override    public List<Group> getUserGroups(User user) {
      log.info("It should be DB call here");        return Arrays.asList(
                new Group("first"),                new Group("second")
        );    }
}


Permission service should check if provided user has access to provided group.

package com.demien.springtest.service;
import com.demien.springtest.domain.User;
public interface PermissionService {
    boolean hasAccess(User user, String groupName);}

For this, service is calling UserGroupService to get list of groups assigned to user and checks if provided group exists in this list.

package com.demien.springtest.service;
import com.demien.springtest.domain.User;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.stereotype.Service;
@Slf4j@Service@Qualifier("PermissionService-main")
public class PermissionServiceImpl implements PermissionService {

    @Autowired    @Qualifier("UserGroupService-main")
    private UserGroupService userGroupService;
    @Override    public boolean hasAccess(User user, String groupName) {
        log.info("calling userGroupService");        return userGroupService.getUserGroups(user).stream().anyMatch(group -> group.getName().equals(groupName));    }
}

1.4 Other stuff. 

Domain objects are very simple:
Group:
package com.demien.springtest.domain;
import lombok.AllArgsConstructor;import lombok.Getter;
@Getter@AllArgsConstructorpublic class Group {

    String name;}


User:
package com.demien.springtest.domain;
public class User {
}


Config class is just specifying package to scan for spring annotations:
package com.demien.springtest.config;
import org.springframework.context.annotation.ComponentScan;import org.springframework.context.annotation.Configuration;
@Configuration@ComponentScan(basePackages =  {"com.demien.springtest.service"})
public class MainConfig {

}

Main class: little bit more interesting: we have to start spring context get our PermissionService and check if user has access to group:

package com.demien.springtest;
import com.demien.springtest.config.MainConfig;import com.demien.springtest.domain.User;import com.demien.springtest.service.PermissionService;import lombok.extern.slf4j.Slf4j;import org.springframework.context.ApplicationContext;import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@Slf4jpublic class App {
    public static void main(String[] args) {
        final User dummyUser = new User();        ApplicationContext context = new AnnotationConfigApplicationContext(MainConfig.class);        PermissionService permissionService =  context.getBean(PermissionService.class);        log.info("Permission result: for [admin] group: " +  permissionService.hasAccess(dummyUser, "admin"));        log.info("Permission result: for [first] group: " +  permissionService.hasAccess(dummyUser, "first"));    }
}

1.4. Execution

As expected, PermissionService calls userGroupService, which actually has to call some DB, and our user has access to group "first" and don't have access to group "admin" (we hardcoded "first" and "second" in userGroupService)

[main] INFO com.demien.springtest.service.PermissionServiceImpl - calling userGroupService
[main] INFO com.demien.springtest.service.UserGroupServiceImpl - It should be DB call here
[main] INFO com.demien.springtest.App - Permission result: for [admin] group: false

[main] INFO com.demien.springtest.service.PermissionServiceImpl - calling userGroupService
[main] INFO com.demien.springtest.service.UserGroupServiceImpl - It should be DB call here
[main] INFO com.demien.springtest.App - Permission result: for [first] group: true



2. Testing 

Application seems to be working, but how can we create some unit tests for it?

2.1. Spring test

Most straightforward approach here  - just run our main spring context, execute hasAccess mechod and check the results:


package com.demien.springtest;
import com.demien.springtest.config.MainConfig;import com.demien.springtest.domain.User;import com.demien.springtest.service.PermissionService;import org.junit.Test;import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import static org.junit.Assert.*;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MainConfig.class)
public class PermissionServiceSpringTest {

    final User dummyUser = new User();
    @Autowired    @Qualifier("PermissionService-main")
    private PermissionService permissionService;

    @Test    public void calculateSum() {
        assertTrue(permissionService.hasAccess(dummyUser, "first"));        assertFalse(permissionService.hasAccess(dummyUser, "zero"));    }
}

Looks good, but the problem here: we have to start our main spring context, what if it has 100 beans? And anyway, we just want to test PermissionService but it's calling UserGroupService inside - so it's not a unit test, but integration test!

2.2 Spring stub test

Ok, we understand now that it's not a good idea to run the whole our main spring context, so let's create a test context with "stub" implementation of UserGroupService.

package com.demien.springtest;
import com.demien.springtest.domain.Group;import com.demien.springtest.domain.User;import com.demien.springtest.service.PermissionService;import com.demien.springtest.service.PermissionServiceImpl;import com.demien.springtest.service.UserGroupService;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.Arrays;import java.util.List;
import static org.junit.Assert.assertFalse;import static org.junit.Assert.assertTrue;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = PermissionServiceStubTest.TestConfig.class)
public class PermissionServiceStubTest {

    @Configuration    static class TestConfig {

        @Bean        @Qualifier("UserGroupService-main")
        public UserGroupService testUserGroupService() {
            return new UserGroupServiceTestImpl();        }

        @Bean        @Qualifier("PermissionService-test")
        public PermissionService testPermissionService() {
            return new PermissionServiceImpl();        }

    }

    static class UserGroupServiceTestImpl implements UserGroupService {

        @Override        public List<Group> getUserGroups(User user) {
            return Arrays.asList(
                    new Group("stubGroup1"), new Group("stubGroup2")
            );        }
    }

    final User dummyUser = new User();
    @Autowired    @Qualifier("PermissionService-test")
    private PermissionService permissionService;

    @Test    public void calculateSum() {
        assertTrue(permissionService.hasAccess(dummyUser, "stubGroup2"));        assertFalse(permissionService.hasAccess(dummyUser, "stubGroup5"));    }

}

It's much better now! We're not running our main context - just test context with needed beans. And real beans can be replaced with "stub" (UserGroupServiceTestImpl) versions!

2.3 Mock test

The problem with previous approach is: looks like for every bean, we will have to create an additional test context, and create a stub implementation of every dependency. Can we make it more simple? Sure we can: using mocks!

package com.demien.springtest;
import com.demien.springtest.domain.Group;import com.demien.springtest.domain.User;import com.demien.springtest.service.PermissionService;import com.demien.springtest.service.PermissionServiceImpl;import com.demien.springtest.service.UserGroupService;import org.junit.Assert;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.mockito.BDDMockito;import org.mockito.InjectMocks;import org.mockito.Matchers;import org.mockito.Mock;import org.mockito.runners.MockitoJUnitRunner;
import java.util.Arrays;
@RunWith(MockitoJUnitRunner.class)
public class PermissionServiceMockTest {

    @Mock    private UserGroupService userGroupService;
    @InjectMocks    private PermissionService permissionService = new PermissionServiceImpl();
    final User dummyUser = new User();
    @Before    public void init() {
        BDDMockito.given(userGroupService.getUserGroups(Matchers.any(User.class))).willReturn(
                Arrays.asList(new Group("mockGroup1"), new Group("mockGroup2"), new Group("mockGroup3") )
        );    }

    @Test    public void test() {
        Assert.assertTrue(permissionService.hasAccess(dummyUser, "mockGroup2"));        Assert.assertFalse(permissionService.hasAccess(dummyUser, "newGroup"));    }

}


Now don't have to start spring context at all! We just have to define behavior of our mock of UserGroupService and run the test! Excellent ! But can it be better?

2.4. Refactoring

Let's take a closer look now. Actually, the main functionality we want to test is here:
userGroupService.getUserGroups(user).stream().anyMatch(group -> group.getName().equals(groupName));

Let's now try to split it into 2 functions:

public boolean contains(List<Group> groups, String groupName) {
    return groups.stream().anyMatch(group -> group.getName().equals(groupName));}

@Overridepublic boolean hasAccess(User user, String groupName) {
    log.info("calling userGroupService");    return contains(userGroupService.getUserGroups(user), groupName);}


Now our business logic isolated from userGroupService in method "contains" . And we can easily test with method using just JUnit without any mocks/stubs/spring contexts!

@Testpublic void containsTest() {
    PermissionServiceImplRefactored refactored = new PermissionServiceImplRefactored();    List<Group> testGroups = Arrays.asList(
            new Group("first"),            new Group("second")
    );    Assert.assertTrue(refactored.contains(testGroups, "first"));    Assert.assertFalse(refactored.contains(testGroups, "trash"));}


2.5. The end.

As always, simplest way is the best way! And remember, if you're starting spring context - it's not a unit test!  Full source code can be downloaded from here

Wednesday, June 13, 2018

Oauth2 with Spring Boot simple example

I this post, using spring boot, I'll show a basic Oauth2 flow with :
 - Authorization server
 - Client app which logs in to Authorization server using username and password, takes login token as a response of successful login and calls resource server with received token.
 - Resource server(which have protected resource) handles requests, grabs token from the request, validates tokens on Authorization server, returns requested data.

So,  I have to create 3 separate spring boot applications(Authorization server, Client app, Resource server), run them, and make sure all flow works.


0. Dependencies

For simplicity I'm using the same dependencies for all 3 applications:
dependencies {
    compile('org.springframework.boot:spring-boot-starter-web')
    compile('org.springframework.cloud:spring-cloud-starter-oauth2')
    compile('org.springframework.cloud:spring-cloud-starter-security')
    testCompile('org.springframework.boot:spring-boot-starter-test')
}

1. Authorization server

Authorization server it's a spring boot application which will be used to authorize user by credentials sent by client application. As a response it should send a token back to client. 
In properties, we are defining clientId, clientSecret (password) which should be used for authorization. Also we should define grant-types, and if we need them - scopes (in this example scope will not be used).

application.properties:

server.port: 9000
server.servlet.context-path: /servicessecurity.oauth2.client.clientId: myClientIdsecurity.oauth2.client.clientSecret: myClientSecretsecurity.oauth2.client.authorized-grant-types: authorization_code,refresh_token,password,client_credentialssecurity.oauth2.client.scope:data_insert,data_update, data_delete, data_select


In service config file  I' defining users(huan and joe) of my application with passwords. Of course, in real life they will not be defined in a code, but should be stored in DB.

ServiceConfig:
package com.demien.sboot.oauthserver;
import org.springframework.context.annotation.Configuration;import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;import org.springframework.security.config.annotation.authentication.configuration.GlobalAuthenticationConfigurerAdapter;
@Configurationpublic class ServiceConfig extends GlobalAuthenticationConfigurerAdapter {
    @Override    public void init(AuthenticationManagerBuilder auth) throws Exception {
        auth.inMemoryAuthentication()
                .withUser("huan").password("{noop}sebastyan").roles("USER") .and()
                .withUser("joe").password("{noop}black").roles("USER", "ADMIN");    }
}


In server runner application we have to define endpoint for getting user details - we will use it later.

Server runner application:
package com.demien.sboot.oauthserver;
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.security.oauth2.config.annotation.web.configuration.EnableAuthorizationServer;import org.springframework.security.oauth2.config.annotation.web.configuration.EnableResourceServer;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;
import java.security.Principal;
@SpringBootApplication@EnableAuthorizationServer@EnableResourceServer@RestControllerpublic class OauthServerApp {
    public static void main(String[] args) {
        SpringApplication.run(OauthServerApp.class, args);    }

    @RequestMapping("/user")
    public Principal user(Principal user) {
        return user;    }
}


2. Resource server

Resource server is a spring boot application which has some protected resource(endpoint "/mydata"), which is not accessible without authorization.  Client should provide authorization token to call this endpoint. In properties file we should define endpoint from authorization server mentioned above.

application.properties
server.port=9001server.servlet.context-path=/servicessecurity.oauth2.resource.userInfoUri:http://localhost:9000/services/user


Service config:

package com.demien.sboot.service;
import org.springframework.context.annotation.Configuration;import org.springframework.security.access.expression.method.MethodSecurityExpressionHandler;import org.springframework.security.config.annotation.method.configuration.EnableGlobalMethodSecurity;import org.springframework.security.config.annotation.method.configuration.GlobalMethodSecurityConfiguration;import org.springframework.security.oauth2.provider.expression.OAuth2MethodSecurityExpressionHandler;
@Configuration@EnableGlobalMethodSecurity(prePostEnabled = true)
public class ServiceConfig extends GlobalMethodSecurityConfiguration {

    @Override    protected MethodSecurityExpressionHandler createExpressionHandler() {
        return new OAuth2MethodSecurityExpressionHandler();    }
}


In main class we're just defining endpoint and class with some very important data.

Application runner:

package com.demien.sboot.service;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.security.oauth2.resource.ResourceServerProperties;import org.springframework.context.annotation.Bean;import org.springframework.security.access.prepost.PreAuthorize;import org.springframework.security.oauth2.config.annotation.web.configuration.EnableResourceServer;import org.springframework.security.oauth2.provider.token.ResourceServerTokenServices;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
@SpringBootApplication@RestController@EnableResourceServerpublic class ServiceApp {

    public static void main(String[] args) {
        SpringApplication.run(ServiceApp.class, args);    }

    @RequestMapping("/mydata")
    public ArrayList<MyData> getTollData() {

        ArrayList<MyData> result = new ArrayList<MyData>();        result.add(new MyData(1L, "one"));        result.add(new MyData(2L, "two"));        result.add(new MyData(3L, "three"));
        return result;    }


    public class MyData {

        public final Long myId;        public final String myValue;
        public MyData(Long myId, String myValue) {
            this.myId = myId;            this.myValue = myValue;        }

        public Long getMyId() {
            return myId;        }

        public String getMyValue() {
            return myValue;        }
    }


}

3. Command line client 


Client application is a most interesting thing here. First of all we have to define in properties detail for authorization.

application.yml
server:  port: 9090
  servlet:    context-path: /services

security:  oauth2:    client:      clientId: myClientId
      clientSecret: myClientSecret
      accessTokenUri: http://localhost:9000/services/oauth/token
      userAuthorizationUri: http://localhost:9000/services/oauth/authorize
      clientAuthenticationScheme: form
    resource:      userInfoUri: http://localhost:9000/services/user
      preferTokenInfo: false


In main application runner we are about to call our protected endpoint: http://localhost:9001/services/mydata
But for this call we should be authorized first. I'm using credentials of user "joe" for this.
Also I'm printing authorization token, to make sure we have it.
And finally I'm calling this endpoint. 

Application runner:
package com.demien.sboot.client;
import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.security.oauth2.client.OAuth2RestTemplate;import org.springframework.security.oauth2.client.token.grant.password.ResourceOwnerPasswordResourceDetails;import org.springframework.security.oauth2.common.AuthenticationScheme;
import java.util.Arrays;
@SpringBootApplicationpublic class CommandLineApp implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(CommandLineApp.class, args);    }


    @Override    public void run(String... args) throws Exception {
        System.out.println("starting");        ResourceOwnerPasswordResourceDetails resourceDetails = new ResourceOwnerPasswordResourceDetails();        resourceDetails.setAuthenticationScheme(AuthenticationScheme.header);        resourceDetails.setAccessTokenUri("http://localhost:9000/services/oauth/token");        resourceDetails.setScope(Arrays.asList("data_select"));        resourceDetails.setClientId("myClientId");        resourceDetails.setClientSecret("myClientSecret");        resourceDetails.setUsername("joe");        resourceDetails.setPassword("black");
        OAuth2RestTemplate restTemplate = new OAuth2RestTemplate(resourceDetails);        String token = restTemplate.getAccessToken().getValue();        System.out.println("token:" + token);
        String s = restTemplate.getForObject("http://localhost:9001/services/mydata", String.class);        System.out.println("Result:" + s);    }
}

4. Execution

First of all we should start Authorization and resource services.
After that, we're free to go: let's start our client.
For me it produced next output:

starting
token:2b9560c9-355d-4134-a63b-e10f05b1b9b4
Result:[{"myId":1,"myValue":"one"},{"myId":2,"myValue":"two"},{"myId":3,"myValue":"three"}]

It looks simple, but under the hood client called authorization server, to get token, called resource service with token saved in session. Resource service called authorization server again to validate the token and after that - returned result back to client.


5. The end

Source code can be downloaded from here

Sunday, May 20, 2018

Getting started with Akka


1. Intro

In a world of concurrency and multi-threading, akka is very popular solution, because it provides lightweight actor system. In this post I'm  about to show a very simple example of application with akka actor system. 

So what is akka ? From wiki: 

Akka is a free and open-source toolkit and runtime simplifying the construction of concurrent and distributed applications on the JVM. Akka supports multiple programming models for concurrency, but it emphasizes actor-based concurrency, with inspiration drawn from Erlang.[citation needed]


The main idea is: parallel tasks should be executed by actors(workers). Each actor can receive  and send messages.  For example, actor can receive task with details about work to do. And after doing this work - can send results back to sender(or in more complicated flow - can delegate some pieces of work to another actors).




2. Application structure

In this application we will be counting words in a file. We will have 2 types of actors:
- FileWordCounterActor (jus one such actor) - responsible for counting words in the whole file
- LineWordCounterActor(one actor per line) - responsible for counting words in a line

Communication between main function and actors will be via messages:
 - we have to send a message to  FileWordCounterActor with the name of file, for which we want to count the words
 - later FileWordCounterActor should split the file into lines, create for every line LineWordCounterActor and send him a message with this line
 - LineWordCounterActor have to count the words in a line and send a result message with this count back to FileWordCounterActor
 - and finally FileWordCounterActor have to sum all received from LineWordCounterActors counts and return this sum back to main function





3. build.sbt

We need just one dependency for akka

name := "akka-demo"
version := "0.1"
scalaVersion := "2.11.8"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.12"



4. Main application class

In main application class we are creating actor system and from it creating our main actor: FileWordCounterActor. In akka library a lot of methods require implicit parameters - to avoid this implicit "magic" and make code more understandable I used caring. All we need to do in main method is:
- send a message to our main actor with file name
- from future object from message sending we can get the result and print it

package com.demien.akkademo

import akka.actor.{ActorSystem, Props}
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.dispatch.ExecutionContexts.global
object AkkaApp extends App {

  val system = ActorSystem("System")
  val wcActor = system.actorOf(Props(new FileWordCounterActor(args(0))))
  val wcActorAskWithTimeout = wcActor.ask(_: Any)(Timeout(10 seconds))

  val askingFuture = wcActorAskWithTimeout(StartProcessFileMsg())
  val askingFutureGlobalContextMap = askingFuture.map(_: Any => Any)(global())

  askingFutureGlobalContextMap(result => {
    println("Total number of words " + result)
    system.terminate()
    result
  })


}


5. FileWordCounter actor

In this actor we are receiving message with file name, opening this file and for each line in this file we are creating the "child" actor: LineWordCounterActor and sending him the message with line from file. Also we are receiving from this child actor result of calculation and accumulating it. At the end, when every line was processed, we can return the result to our main activity.

package com.demien.akkademo

import akka.actor.{Actor, ActorRef, Props}

case class StartProcessFileMsg()

class FileWordCounterActor(filename: String) extends Actor {

  private var totalLines = 0  private var linesProcessed = 0  private var result = 0  private var fileSender: ActorRef = null
  def receive = {
    case StartProcessFileMsg() => {
      fileSender = sender
      import scala.io.Source._
      fromFile(filename).getLines.foreach { line =>
        val lineWordCounterActor = context.actorOf(Props[LineWordCounterActor])
        lineWordCounterActor.tell(CountWordsInLineMsg(line), self)
        totalLines += 1      }

    }
    case WordsInLineResultMsg(wordsCount) => {
      result += wordsCount
      linesProcessed += 1      if (linesProcessed == totalLines) {
        fileSender.tell(result, self)
      }
    }
    case _ => println("message not recognized!")
  }
}



6. LineWordCounter actor

Here we just have to count number of words in the line and send result to "parent" actor.

package com.demien.akkademo

import akka.actor.Actor

case class CountWordsInLineMsg(line: String)

case class WordsInLineResultMsg(words: Integer)

class LineWordCounterActor extends Actor {
  def receive = {
    case CountWordsInLineMsg(string) => {
      val wordsInLine = string.split(" ").length
      sender.tell(WordsInLineResultMsg(wordsInLine), self)
    }
    case _ => println("Error: message not recognized")
  }
}



7. Execution

For execution we have to pass command line parameter with the file name: 




And the execution result should be something like this:

Total number of words 1400

Process finished with exit code 0


8. The end

Full source code can be downloaded from here.

Saturday, April 28, 2018

Apache Spark - getting started: batch and stream data processing using scala

0. Intro 

In this post I'm going to explain basics of Apache Spark: RDD, SparkSQL, DataFrames, SparkStreaming. Actually Spark -  it's just a library for data processing. So it can be executed without any BigData-related stuff. You can just run code from this post without any Hadoop/HDFS.


1. What is Apache Spark

from wiki:
Apache Spark is an open-source cluster-computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance.

From official documentaion:
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.



As you can see, there is no mentioning of Hadoop/HDFS at all. It CAN work with Hadoop/HDFS, it CAN work with cluster resource manager like YARN. But for getting familiar,  also it can be used  for processing local files in standalone mode.

2. What is Lambda architecture

From wiki:
Lambda architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch- and stream-processingmethods. This approach to architecture attempts to balance latencythroughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data. The two view outputs may be joined before presentation. The rise of lambda architecture is correlated with the growth of big data, real-time analytics, and the drive to mitigate the latencies of map-reduce.[1]





In shorts, lambda architecture is a combination of 2 processing types:
- slow but precese - in this post we will use Spark RDD/DataFrame for it
- fast but not precise -  in this post we will use Spark Streaming for it



3. Project setup: pom.xml

Here I'm using maven with scala. In pom.xml I'm adding dependencies for spark-core, spark-sql, spark-streaming.


<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demien</groupId>
    <artifactId>sparktest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>


    </dependencies>


</project>




4. Project structure




As you can see, we will be using 2 processing types of batch processing: RDD and DataFrame. And one type of stream processing: DStream. Apart from that we will be using SparkUtils class for creation of common sprak entities: SparkContext, SparkSession. And a generator of text files(for streaming) TextFileCreator.

5. SparkUtils

As I mentioned before, we will be using SparkUtils class for creation of common sprak entities: SparkContext, SparkSession:

package com.demien.sparktest

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object SparkUtils {

  val DEF_APP_NAME = "MySparkApp"  val DEF_MASTER = "local[2]"
  //The SparkContext object - connection to a Spark execution environment and created RDDs  def getSparkContext(appName: String, master: String): SparkContext = new SparkContext(new SparkConf().setAppName(appName).setMaster(master))

  def getSparkContext(): SparkContext = getSparkContext(DEF_APP_NAME, DEF_MASTER)

  //The SparkSession - connection to dataframes and SQLs  def getSparkSession(appName: String, master: String): SparkSession = SparkSession
    .builder()
    .appName(appName)
    .master(master)
    .getOrCreate()

  def getSparkSession(): SparkSession = getSparkSession(DEF_APP_NAME, DEF_MASTER)


}



6. Batch processing: RDD


From official documentation:

The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

RDD Example: 

package com.demien.sparktest.batch

import com.demien.sparktest.SparkUtils
import org.apache.spark.rdd.RDDwith

// https://spark.apache.org/docs/2.3.0/rdd-programming-guide.htmlobject RddExample extends App {

  val sc = SparkUtils.getSparkContext()
  val file = sc.textFile("src/main/resources/sample.txt")
  val words: RDD[String] = file.flatMap(l => l.split(" ")).filter(w => w.length > 1)
  val pairs: RDD[(String, Int)] = words.map(s => (s, 1)) // [the, of, the] => (the, 1) (of, 1) (the, 1)  val counts: RDD[(String, Int)] = pairs.reduceByKey((a, b) => a + b) // (the, 1) (of, 1) (the, 1) => (the, 2) (of, 1)  val countByWord: RDD[(Int, String)] = counts.map(p => p.swap) // (the, 2) (of, 1) => (2, the) (1, of)  val countByWordSorted: RDD[(Int, String)] = countByWord.sortByKey(false)
  val top5 = countByWordSorted.take(5)

  top5.foreach(p => println(p))
}

- I added comments and  datatypes for RDD variables(which of cource are not needed here) to make it more clear. This RDD example is processing sample text file - it's just a text from ApacheSpark wiki. We are splitting text into words, creating for every work "paired object" with word itself and number 1. After that, we are groupping these pairs using word as a key and counting provided numbers.

Results:
(55,the)
(46,of)
(43,Spark)
(39,and)
(24,in)

- as you can see, most popular word(excluding "the", "of", "and", "in") in Spark wiki is "Spark" :)

7. Batch processing: DataFrame/Spark SQL


From official documentation:

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala APIDataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

DataFrames - are the structured datasets, so as a sample file we will be using not TEXT file but  JSON like this:
{
  "name": "Keeley Bosco",
  "email": "katlyn@jenkinsmaggio.net",
  "city": "Lake Gladysberg",
  "mac": "08:fd:0b:cd:77:f7",
  "timestamp": "2015-04-25 13:57:36 +0700",
  "creditcard": "1228-1221-1221-1431"}
{
  "name": "Rubye Jerde",
  "email": "juvenal@johnston.name",
  "city": null,
  "mac": "90:4d:fa:42:63:a2",
  "timestamp": "2015-04-25 09:02:04 +0700",
  "creditcard": "1228-1221-1221-1431"}



DataFrame example:

package com.demien.sparktest.batch

// https://spark.apache.org/docs/latest/sql-programming-guide.html
import org.apache.spark.sql.SparkSession

object DataFrameExample extends App {

  val spark = SparkSession
    .builder()
    .appName("Spark SQL basic example")
    .config("spark.master", "local")
    .getOrCreate()

  // For implicit conversions like converting RDDs to DataFrames  import spark.implicits._


  val df = spark.read.json("src/main/resources/people.json")
  df.printSchema()
  df.createOrReplaceTempView("people")
  val sqlDF = spark.sql("SELECT * FROM people where email like '%net%' ")
  sqlDF.show()

  case class Person(name: String, email: String, city: String, mac: String, timestamp: String, creditcard: String)

  val peopleDS = spark.read.json("src/main/resources/people.json").as[Person]
  val filteredDS = peopleDS.filter(p => p.email != null && p.email.contains("net"))
  filteredDS.show()


}

We are using SparkSQL to query our structured dataset(DataFrame) for people which have "%net%" in their emails. Also, at the end we are doing the same thing again, but using using DataSet api.
Or cource, in both cases results are the same:

+---------------+-------------------+--------------------+-----------------+----------------+--------------------+
|           city|         creditcard|               email|              mac|            name|           timestamp|
+---------------+-------------------+--------------------+-----------------+----------------+--------------------+
|Lake Gladysberg|1228-1221-1221-1431|katlyn@jenkinsmag...|08:fd:0b:cd:77:f7|    Keeley Bosco|2015-04-25 13:57:...|
|           null|1228-1221-1221-1431|emery_kunze@rogah...|3a:af:c9:0b:5c:08|Celine Ankunding|2015-04-25 14:22:...|
+---------------+-------------------+--------------------+-----------------+----------------+--------------------+


Unfortunatelly, spark is not showing full values, but these email values  are:
"katlyn@jenkinsmaggio.net, "emery_kunze@rogahn.net"

- emails which contain "net".


8. Stream processing: DStream


From official documentation:

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like mapreducejoin and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.


8.1 Stream processing: DStream: TextFileCreator

To simulate stream of data, we will create the simple application which is creating text file every 10 seconds. As a source for this file I will be using again text from ApachSpark wiki. 

package com.demien.sparktest

import java.io.FileWriter
import java.util.Date

import scala.io.Source
import scala.util.Random

object TextFileCreator extends App {

  val listOfLines = Source.fromFile("src/main/resources/sample.txt").getLines.toList
  val rnd = new Random()

  while (true) {

    val fileName = new Date().getTime
    val fullFileName = "data/" + fileName + ".txt"    val fw = new FileWriter(fullFileName, true)
    println("writing to " + fullFileName)

    val linesCount = rnd.nextInt(20) + 5    for (i <- 1 to linesCount) fw.write(listOfLines(rnd.nextInt(100)) + "\n")

    fw.close()
    Thread.sleep(10000)

  }

}



8.2 Stream processing: DStream: Streaming example itself

Our application will be monitoring "data" folder for new files. When new file is received - it will be processed. To simulate some statefull activity we will be using function:  specFunc - the point is to constantly calculate count of words. 


package com.demien.sparktest.stream

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

object DStreamExample extends App {


  val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  val ssc = new StreamingContext(conf, Seconds(10))
  ssc.checkpoint("spark-checkpoint")

  val lines = ssc.textFileStream("data")
  val words = lines.flatMap(_.split(" "))
  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)


  def specFunc = (key: String, value: Option[Int], state: State[Int]) => {
    var newState = state.getOption().getOrElse(0)
    var newValue = value.getOrElse(1)
    newState = newState + newValue
    state.update(newState)
    (key, newValue)
  }

  val spec = StateSpec.function(specFunc).timeout(Seconds(30))

  val wordsMapped = wordCounts.mapWithState(spec)

  // top 10  wordsMapped.stateSnapshots().foreachRDD(rdd => {
    rdd.map(e => (e._2, e._1)).sortByKey(false).take(10).foreach(e => print(e._1, e._2))

  })

  ssc.start() // Start the computation  ssc.awaitTermination() // Wait for the computation to terminate
}

8.3. Stream processing: DStream: Execution

Of course, we have to run both: TextFileCreator and DStreamExample.

TextFileCreator is creating files:
writing to data/1524921644177.txt
writing to data/1524921654262.txt
writing to data/1524921664264.txt
writing to data/1524921674266.txt
writing to data/1524921684267.txt
writing to data/1524921694268.txt
writing to data/1524921704270.txt


And DStreamExample is processing them and counting words:

.....
(17,a)(16,Spark)(15,)(14,the)(10,Apache)(10,of)(7,//)(5,is)(5,can)(5,in)
(17,)(17,a)(17,Spark)(14,the)(10,Apache)(10,of)(7,//)(5,is)(5,can)(5,in)
(25,)(24,the)(21,Spark)(21,a)(19,of)(12,Apache)(10,can)(10,as)(10,and)(9,is)
(35,)(27,the)(25,Spark)(25,a)(20,of)(17,and)(14,Apache)(12,as)(12,//)(10,can)
(45,)(27,the)(26,Spark)(25,a)(20,of)(17,and)(14,Apache)(12,as)(12,//)(10,can)

As you can see, results are similar to what we had in RddExample: most popular words are Spark and Apache.



9. The end. 

As you can see, to try Apache Spark you don't need Hadoop/Yarn - it's possible to run it in a standalone mode without all these compicated things. Source code can be downloaded from here.