Showing posts with label stream. Show all posts
Showing posts with label stream. Show all posts

Monday, July 27, 2020

Kafka and Spring Boot(Spring Cloud Stream)

0.Intro

We can interact with Kafka in different ways. Default is to use official Kafka client. By using this client we have to create Kafka Producer and Kafka Consumer and after that we're ready to go. I had a post on this topic here. But if we're using Spring Boot we can try much easier option: Spring Cloud Stream. It is more like "high level" way in comparison to native Kafka client. 

For this demo I created in traditional way 2 applications: Kafka Producer and Kafka Consumer. 
Let's start! 


1. pom.xml 

These files are almost the same for these 2 projects. The only difference is artefact name. 



<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.demien</groupId>
<artifactId>spboot-kafka-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spboot-kafka-producer</name>
<description>Demo project for Spring Boot and Kafka</description>

<properties>
<java.version>11</java.version>
<spring-cloud.version>Hoxton.SR6</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>




2. Producer application

This application will be sending messages to Kafka. 

2.1. Entity 

Let's start from entity class which we will be sending to Kafka. I used Lombok to make it more simple: 
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@ToString
public class TestEntity {
private String id;
private String name;
private String description;
}



2.2. Channel

Ok, we have entity for sending, now we have to configure channel for sending: 
package com.demien.spbootkafkaproducer.domain.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface TestMessageChannels {
@Output("myTestChannel")
MessageChannel myTestChannel();
}



- as you can see it's just an interface - Spring will create an implementation for it in runtime. We just have to define the "id" for this channel, which is "myTestChannel" in this particular case. 

2.3. Sender service

Now we can use our channel for message sending: we should enable bindings for it, autowire it and we're ready to go. 

package com.demien.spbootkafkaproducer.domain.service;

import com.demien.spbootkafkaproducer.domain.message.TestMessageChannels;
import com.demien.spbootkafkaproducer.domain.model.TestEntity;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.logging.Logger;

@Service("testProducerService")
@EnableBinding(TestMessageChannels.class)
public class TestProducerService {

private final static Logger log = Logger.getLogger(TestProducerService.class.getName());

private final TestMessageChannels testMessageChannels;

@Autowired
public TestProducerService(TestMessageChannels testMessageChannels) {
this.testMessageChannels = testMessageChannels;
}

public void sendTestMessages() {
for (int i=0; i<10; i++) {
var idString = Integer.toString(i);
var testEntity = new TestEntity(idString, "Name " + idString, "Description " + idString);
log.info("Sending to kafka entitiy: " + testEntity);
final Message<TestEntity> message = MessageBuilder.withPayload(testEntity).build();
testMessageChannels.myTestChannel().send(message);
}
}
} 

Not complicated, right? We just have to wrap our test entity into Message object and send it into channel. 

2.4. Application runner

And now we can run our application, let the Spring to create out service (and autowire channel) and we can use it: 

package com.demien.spbootkafkaproducer;

import com.demien.spbootkafkaproducer.domain.service.TestProducerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class SpbootKafkaProducerApplication {

public static void main(String[] args) {
final var applicationContext = SpringApplication.run(SpbootKafkaProducerApplication.class, args);
new SpbootKafkaProducerApplication().run(applicationContext);
}

public void run(ApplicationContext applicationContext) {
TestProducerService testProducerService = applicationContext.getBean(TestProducerService.class);
testProducerService.sendTestMessages();
}

}

  

2.5. Application Config

Remember, we defined in #2.2 channel with id "myTestChannel"? Now we have to point it to some Kafka topic. And, basically, configure Kaffka as well. This should be done in application config file(application.properties under resources folder)

spring:
application:
name: test-producer
cloud:
stream:
bindings:
myTestChannel:
destination: myTestTopic
kafka:
binder:
zkNodes: localhost
binder:
brokers: localhost


- thus, we will be sending data to some channel with id "myTestChannel" which is linked to Kafka topic "myTestTopic".  



3. Consumer Application

Consumer will be receiving messages from Kafka. To make consumer simpler I omitted entity class: we will be just printing JSONs with our entities received from Kafka. 

3.1. Input Channel

In a similar way: 
package com.demien.spbootkafkaconsumer.domain.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;

public interface TestMessageChannels {
@Input("myTestChannel")
MessageChannel myTestChannel();
} 


3.2 Consumer Service

Also, in a similar way but in addition we have to annotate some method as "Stream Listener":

package com.demien.spbootkafkaconsumer.domain.service;

import com.demien.spbootkafkaconsumer.domain.message.TestMessageChannels;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

import java.util.logging.Logger;

@Service
@EnableBinding(TestMessageChannels.class)
public class TestConsumerService {

private final static Logger log = Logger.getLogger(TestConsumerService.class.getName());

@StreamListener("myTestChannel")
public void consumeTestEntity(byte[] testEntity) {
log.info("Received from kafka: " + new String(testEntity));
}
}

- as you can see, we're defining listener for our channel and it will triggered every time we're sending messages to Kafka topic linked with this channel. Of course, in real life, here we have to convert JSONs into our entity classes. 

3.3. Application runner

It even simpler than it was for Producer: 

package com.demien.spbootkafkaconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpbootKafkaConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(SpbootKafkaConsumerApplication.class, args);
}

}

3.4. Application configuration

And again we have to link out channel with Kafka topic. 

spring:
application:
name: test-producer
cloud:
stream:
bindings:
myTestChannel:
destination: myTestTopic
consumer:
resetOffsets: true
group:
myTestConsumerGroup
kafka:
binder:
zkNodes: localhost
binder:
brokers: localhost




4. Running

First we have to run Consumer: 

java -jar target/spboot-kafka-consumer-0.0.1-SNAPSHOT.jar


And when it was started - Producer: 

java -jar target/spboot-kafka-producer-0.0.1-SNAPSHOT.jar


Producer should print something like this: 

20-07-27 21:49:44.776  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=1, name=Name 1, description=Description 1)

2020-07-27 21:49:44.788  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=2, name=Name 2, description=Description 2)

2020-07-27 21:49:44.789  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=3, name=Name 3, description=Description 3)

2020-07-27 21:49:44.790  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=4, name=Name 4, description=Description 4)

2020-07-27 21:49:44.791  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=5, name=Name 5, description=Description 5)

2020-07-27 21:49:44.793  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=6, name=Name 6, description=Description 6)

2020-07-27 21:49:44.794  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=7, name=Name 7, description=Description 7)

2020-07-27 21:49:44.795  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=8, name=Name 8, description=Description 8)

2020-07-27 21:49:44.796  INFO 14016 --- [           main] c.d.s.d.service.TestProducerService      : Sending to kafka entitiy: TestEntity(id=9, name=Name 9, description=Description 9)




And Consumer: 

2020-07-27 21:47:46.351  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"0","name":"Name 0","description":"Description 0"}

2020-07-27 21:47:46.365  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"1","name":"Name 1","description":"Description 1"}

2020-07-27 21:47:46.367  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"2","name":"Name 2","description":"Description 2"}

2020-07-27 21:47:46.367  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"3","name":"Name 3","description":"Description 3"}

2020-07-27 21:47:46.369  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"4","name":"Name 4","description":"Description 4"}

2020-07-27 21:47:46.369  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"5","name":"Name 5","description":"Description 5"}

2020-07-27 21:47:46.370  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"6","name":"Name 6","description":"Description 6"}

2020-07-27 21:47:46.371  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"7","name":"Name 7","description":"Description 7"}

2020-07-27 21:47:46.372  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"8","name":"Name 8","description":"Description 8"}

2020-07-27 21:47:46.372  INFO 13839 --- [container-0-C-1] c.d.s.d.service.TestConsumerService      : Received from kafka: {"id":"9","name":"Name 9","description":"Description 9"}


It works! :)



5. The end

As you can see, with Spring Boot we can interact with Kafka on much higher level, so it more simple. 
Source code can be downloaded from here



Saturday, April 28, 2018

Apache Spark - getting started: batch and stream data processing using scala

0. Intro 

In this post I'm going to explain basics of Apache Spark: RDD, SparkSQL, DataFrames, SparkStreaming. Actually Spark -  it's just a library for data processing. So it can be executed without any BigData-related stuff. You can just run code from this post without any Hadoop/HDFS.


1. What is Apache Spark

from wiki:
Apache Spark is an open-source cluster-computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance.

From official documentaion:
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.



As you can see, there is no mentioning of Hadoop/HDFS at all. It CAN work with Hadoop/HDFS, it CAN work with cluster resource manager like YARN. But for getting familiar,  also it can be used  for processing local files in standalone mode.

2. What is Lambda architecture

From wiki:
Lambda architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch- and stream-processingmethods. This approach to architecture attempts to balance latencythroughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data. The two view outputs may be joined before presentation. The rise of lambda architecture is correlated with the growth of big data, real-time analytics, and the drive to mitigate the latencies of map-reduce.[1]





In shorts, lambda architecture is a combination of 2 processing types:
- slow but precese - in this post we will use Spark RDD/DataFrame for it
- fast but not precise -  in this post we will use Spark Streaming for it



3. Project setup: pom.xml

Here I'm using maven with scala. In pom.xml I'm adding dependencies for spark-core, spark-sql, spark-streaming.


<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demien</groupId>
    <artifactId>sparktest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>


    </dependencies>


</project>




4. Project structure




As you can see, we will be using 2 processing types of batch processing: RDD and DataFrame. And one type of stream processing: DStream. Apart from that we will be using SparkUtils class for creation of common sprak entities: SparkContext, SparkSession. And a generator of text files(for streaming) TextFileCreator.

5. SparkUtils

As I mentioned before, we will be using SparkUtils class for creation of common sprak entities: SparkContext, SparkSession:

package com.demien.sparktest

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object SparkUtils {

  val DEF_APP_NAME = "MySparkApp"  val DEF_MASTER = "local[2]"
  //The SparkContext object - connection to a Spark execution environment and created RDDs  def getSparkContext(appName: String, master: String): SparkContext = new SparkContext(new SparkConf().setAppName(appName).setMaster(master))

  def getSparkContext(): SparkContext = getSparkContext(DEF_APP_NAME, DEF_MASTER)

  //The SparkSession - connection to dataframes and SQLs  def getSparkSession(appName: String, master: String): SparkSession = SparkSession
    .builder()
    .appName(appName)
    .master(master)
    .getOrCreate()

  def getSparkSession(): SparkSession = getSparkSession(DEF_APP_NAME, DEF_MASTER)


}



6. Batch processing: RDD


From official documentation:

The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

RDD Example: 

package com.demien.sparktest.batch

import com.demien.sparktest.SparkUtils
import org.apache.spark.rdd.RDDwith

// https://spark.apache.org/docs/2.3.0/rdd-programming-guide.htmlobject RddExample extends App {

  val sc = SparkUtils.getSparkContext()
  val file = sc.textFile("src/main/resources/sample.txt")
  val words: RDD[String] = file.flatMap(l => l.split(" ")).filter(w => w.length > 1)
  val pairs: RDD[(String, Int)] = words.map(s => (s, 1)) // [the, of, the] => (the, 1) (of, 1) (the, 1)  val counts: RDD[(String, Int)] = pairs.reduceByKey((a, b) => a + b) // (the, 1) (of, 1) (the, 1) => (the, 2) (of, 1)  val countByWord: RDD[(Int, String)] = counts.map(p => p.swap) // (the, 2) (of, 1) => (2, the) (1, of)  val countByWordSorted: RDD[(Int, String)] = countByWord.sortByKey(false)
  val top5 = countByWordSorted.take(5)

  top5.foreach(p => println(p))
}

- I added comments and  datatypes for RDD variables(which of cource are not needed here) to make it more clear. This RDD example is processing sample text file - it's just a text from ApacheSpark wiki. We are splitting text into words, creating for every work "paired object" with word itself and number 1. After that, we are groupping these pairs using word as a key and counting provided numbers.

Results:
(55,the)
(46,of)
(43,Spark)
(39,and)
(24,in)

- as you can see, most popular word(excluding "the", "of", "and", "in") in Spark wiki is "Spark" :)

7. Batch processing: DataFrame/Spark SQL


From official documentation:

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala APIDataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

DataFrames - are the structured datasets, so as a sample file we will be using not TEXT file but  JSON like this:
{
  "name": "Keeley Bosco",
  "email": "katlyn@jenkinsmaggio.net",
  "city": "Lake Gladysberg",
  "mac": "08:fd:0b:cd:77:f7",
  "timestamp": "2015-04-25 13:57:36 +0700",
  "creditcard": "1228-1221-1221-1431"}
{
  "name": "Rubye Jerde",
  "email": "juvenal@johnston.name",
  "city": null,
  "mac": "90:4d:fa:42:63:a2",
  "timestamp": "2015-04-25 09:02:04 +0700",
  "creditcard": "1228-1221-1221-1431"}



DataFrame example:

package com.demien.sparktest.batch

// https://spark.apache.org/docs/latest/sql-programming-guide.html
import org.apache.spark.sql.SparkSession

object DataFrameExample extends App {

  val spark = SparkSession
    .builder()
    .appName("Spark SQL basic example")
    .config("spark.master", "local")
    .getOrCreate()

  // For implicit conversions like converting RDDs to DataFrames  import spark.implicits._


  val df = spark.read.json("src/main/resources/people.json")
  df.printSchema()
  df.createOrReplaceTempView("people")
  val sqlDF = spark.sql("SELECT * FROM people where email like '%net%' ")
  sqlDF.show()

  case class Person(name: String, email: String, city: String, mac: String, timestamp: String, creditcard: String)

  val peopleDS = spark.read.json("src/main/resources/people.json").as[Person]
  val filteredDS = peopleDS.filter(p => p.email != null && p.email.contains("net"))
  filteredDS.show()


}

We are using SparkSQL to query our structured dataset(DataFrame) for people which have "%net%" in their emails. Also, at the end we are doing the same thing again, but using using DataSet api.
Or cource, in both cases results are the same:

+---------------+-------------------+--------------------+-----------------+----------------+--------------------+
|           city|         creditcard|               email|              mac|            name|           timestamp|
+---------------+-------------------+--------------------+-----------------+----------------+--------------------+
|Lake Gladysberg|1228-1221-1221-1431|katlyn@jenkinsmag...|08:fd:0b:cd:77:f7|    Keeley Bosco|2015-04-25 13:57:...|
|           null|1228-1221-1221-1431|emery_kunze@rogah...|3a:af:c9:0b:5c:08|Celine Ankunding|2015-04-25 14:22:...|
+---------------+-------------------+--------------------+-----------------+----------------+--------------------+


Unfortunatelly, spark is not showing full values, but these email values  are:
"katlyn@jenkinsmaggio.net, "emery_kunze@rogahn.net"

- emails which contain "net".


8. Stream processing: DStream


From official documentation:

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like mapreducejoin and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.


8.1 Stream processing: DStream: TextFileCreator

To simulate stream of data, we will create the simple application which is creating text file every 10 seconds. As a source for this file I will be using again text from ApachSpark wiki. 

package com.demien.sparktest

import java.io.FileWriter
import java.util.Date

import scala.io.Source
import scala.util.Random

object TextFileCreator extends App {

  val listOfLines = Source.fromFile("src/main/resources/sample.txt").getLines.toList
  val rnd = new Random()

  while (true) {

    val fileName = new Date().getTime
    val fullFileName = "data/" + fileName + ".txt"    val fw = new FileWriter(fullFileName, true)
    println("writing to " + fullFileName)

    val linesCount = rnd.nextInt(20) + 5    for (i <- 1 to linesCount) fw.write(listOfLines(rnd.nextInt(100)) + "\n")

    fw.close()
    Thread.sleep(10000)

  }

}



8.2 Stream processing: DStream: Streaming example itself

Our application will be monitoring "data" folder for new files. When new file is received - it will be processed. To simulate some statefull activity we will be using function:  specFunc - the point is to constantly calculate count of words. 


package com.demien.sparktest.stream

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

object DStreamExample extends App {


  val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  val ssc = new StreamingContext(conf, Seconds(10))
  ssc.checkpoint("spark-checkpoint")

  val lines = ssc.textFileStream("data")
  val words = lines.flatMap(_.split(" "))
  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)


  def specFunc = (key: String, value: Option[Int], state: State[Int]) => {
    var newState = state.getOption().getOrElse(0)
    var newValue = value.getOrElse(1)
    newState = newState + newValue
    state.update(newState)
    (key, newValue)
  }

  val spec = StateSpec.function(specFunc).timeout(Seconds(30))

  val wordsMapped = wordCounts.mapWithState(spec)

  // top 10  wordsMapped.stateSnapshots().foreachRDD(rdd => {
    rdd.map(e => (e._2, e._1)).sortByKey(false).take(10).foreach(e => print(e._1, e._2))

  })

  ssc.start() // Start the computation  ssc.awaitTermination() // Wait for the computation to terminate
}

8.3. Stream processing: DStream: Execution

Of course, we have to run both: TextFileCreator and DStreamExample.

TextFileCreator is creating files:
writing to data/1524921644177.txt
writing to data/1524921654262.txt
writing to data/1524921664264.txt
writing to data/1524921674266.txt
writing to data/1524921684267.txt
writing to data/1524921694268.txt
writing to data/1524921704270.txt


And DStreamExample is processing them and counting words:

.....
(17,a)(16,Spark)(15,)(14,the)(10,Apache)(10,of)(7,//)(5,is)(5,can)(5,in)
(17,)(17,a)(17,Spark)(14,the)(10,Apache)(10,of)(7,//)(5,is)(5,can)(5,in)
(25,)(24,the)(21,Spark)(21,a)(19,of)(12,Apache)(10,can)(10,as)(10,and)(9,is)
(35,)(27,the)(25,Spark)(25,a)(20,of)(17,and)(14,Apache)(12,as)(12,//)(10,can)
(45,)(27,the)(26,Spark)(25,a)(20,of)(17,and)(14,Apache)(12,as)(12,//)(10,can)

As you can see, results are similar to what we had in RddExample: most popular words are Spark and Apache.



9. The end. 

As you can see, to try Apache Spark you don't need Hadoop/Yarn - it's possible to run it in a standalone mode without all these compicated things. Source code can be downloaded from here.