Saturday, April 28, 2018

Apache Spark - getting started: batch and stream data processing using scala

0. Intro 

In this post I'm going to explain basics of Apache Spark: RDD, SparkSQL, DataFrames, SparkStreaming. Actually Spark -  it's just a library for data processing. So it can be executed without any BigData-related stuff. You can just run code from this post without any Hadoop/HDFS.


1. What is Apache Spark

from wiki:
Apache Spark is an open-source cluster-computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance.

From official documentaion:
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.



As you can see, there is no mentioning of Hadoop/HDFS at all. It CAN work with Hadoop/HDFS, it CAN work with cluster resource manager like YARN. But for getting familiar,  also it can be used  for processing local files in standalone mode.

2. What is Lambda architecture

From wiki:
Lambda architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch- and stream-processingmethods. This approach to architecture attempts to balance latencythroughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data. The two view outputs may be joined before presentation. The rise of lambda architecture is correlated with the growth of big data, real-time analytics, and the drive to mitigate the latencies of map-reduce.[1]





In shorts, lambda architecture is a combination of 2 processing types:
- slow but precese - in this post we will use Spark RDD/DataFrame for it
- fast but not precise -  in this post we will use Spark Streaming for it



3. Project setup: pom.xml

Here I'm using maven with scala. In pom.xml I'm adding dependencies for spark-core, spark-sql, spark-streaming.


<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demien</groupId>
    <artifactId>sparktest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>


    </dependencies>


</project>




4. Project structure




As you can see, we will be using 2 processing types of batch processing: RDD and DataFrame. And one type of stream processing: DStream. Apart from that we will be using SparkUtils class for creation of common sprak entities: SparkContext, SparkSession. And a generator of text files(for streaming) TextFileCreator.

5. SparkUtils

As I mentioned before, we will be using SparkUtils class for creation of common sprak entities: SparkContext, SparkSession:

package com.demien.sparktest

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object SparkUtils {

  val DEF_APP_NAME = "MySparkApp"  val DEF_MASTER = "local[2]"
  //The SparkContext object - connection to a Spark execution environment and created RDDs  def getSparkContext(appName: String, master: String): SparkContext = new SparkContext(new SparkConf().setAppName(appName).setMaster(master))

  def getSparkContext(): SparkContext = getSparkContext(DEF_APP_NAME, DEF_MASTER)

  //The SparkSession - connection to dataframes and SQLs  def getSparkSession(appName: String, master: String): SparkSession = SparkSession
    .builder()
    .appName(appName)
    .master(master)
    .getOrCreate()

  def getSparkSession(): SparkSession = getSparkSession(DEF_APP_NAME, DEF_MASTER)


}



6. Batch processing: RDD


From official documentation:

The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

RDD Example: 

package com.demien.sparktest.batch

import com.demien.sparktest.SparkUtils
import org.apache.spark.rdd.RDDwith

// https://spark.apache.org/docs/2.3.0/rdd-programming-guide.htmlobject RddExample extends App {

  val sc = SparkUtils.getSparkContext()
  val file = sc.textFile("src/main/resources/sample.txt")
  val words: RDD[String] = file.flatMap(l => l.split(" ")).filter(w => w.length > 1)
  val pairs: RDD[(String, Int)] = words.map(s => (s, 1)) // [the, of, the] => (the, 1) (of, 1) (the, 1)  val counts: RDD[(String, Int)] = pairs.reduceByKey((a, b) => a + b) // (the, 1) (of, 1) (the, 1) => (the, 2) (of, 1)  val countByWord: RDD[(Int, String)] = counts.map(p => p.swap) // (the, 2) (of, 1) => (2, the) (1, of)  val countByWordSorted: RDD[(Int, String)] = countByWord.sortByKey(false)
  val top5 = countByWordSorted.take(5)

  top5.foreach(p => println(p))
}

- I added comments and  datatypes for RDD variables(which of cource are not needed here) to make it more clear. This RDD example is processing sample text file - it's just a text from ApacheSpark wiki. We are splitting text into words, creating for every work "paired object" with word itself and number 1. After that, we are groupping these pairs using word as a key and counting provided numbers.

Results:
(55,the)
(46,of)
(43,Spark)
(39,and)
(24,in)

- as you can see, most popular word(excluding "the", "of", "and", "in") in Spark wiki is "Spark" :)

7. Batch processing: DataFrame/Spark SQL


From official documentation:

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala APIDataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

DataFrames - are the structured datasets, so as a sample file we will be using not TEXT file but  JSON like this:
{
  "name": "Keeley Bosco",
  "email": "katlyn@jenkinsmaggio.net",
  "city": "Lake Gladysberg",
  "mac": "08:fd:0b:cd:77:f7",
  "timestamp": "2015-04-25 13:57:36 +0700",
  "creditcard": "1228-1221-1221-1431"}
{
  "name": "Rubye Jerde",
  "email": "juvenal@johnston.name",
  "city": null,
  "mac": "90:4d:fa:42:63:a2",
  "timestamp": "2015-04-25 09:02:04 +0700",
  "creditcard": "1228-1221-1221-1431"}



DataFrame example:

package com.demien.sparktest.batch

// https://spark.apache.org/docs/latest/sql-programming-guide.html
import org.apache.spark.sql.SparkSession

object DataFrameExample extends App {

  val spark = SparkSession
    .builder()
    .appName("Spark SQL basic example")
    .config("spark.master", "local")
    .getOrCreate()

  // For implicit conversions like converting RDDs to DataFrames  import spark.implicits._


  val df = spark.read.json("src/main/resources/people.json")
  df.printSchema()
  df.createOrReplaceTempView("people")
  val sqlDF = spark.sql("SELECT * FROM people where email like '%net%' ")
  sqlDF.show()

  case class Person(name: String, email: String, city: String, mac: String, timestamp: String, creditcard: String)

  val peopleDS = spark.read.json("src/main/resources/people.json").as[Person]
  val filteredDS = peopleDS.filter(p => p.email != null && p.email.contains("net"))
  filteredDS.show()


}

We are using SparkSQL to query our structured dataset(DataFrame) for people which have "%net%" in their emails. Also, at the end we are doing the same thing again, but using using DataSet api.
Or cource, in both cases results are the same:

+---------------+-------------------+--------------------+-----------------+----------------+--------------------+
|           city|         creditcard|               email|              mac|            name|           timestamp|
+---------------+-------------------+--------------------+-----------------+----------------+--------------------+
|Lake Gladysberg|1228-1221-1221-1431|katlyn@jenkinsmag...|08:fd:0b:cd:77:f7|    Keeley Bosco|2015-04-25 13:57:...|
|           null|1228-1221-1221-1431|emery_kunze@rogah...|3a:af:c9:0b:5c:08|Celine Ankunding|2015-04-25 14:22:...|
+---------------+-------------------+--------------------+-----------------+----------------+--------------------+


Unfortunatelly, spark is not showing full values, but these email values  are:
"katlyn@jenkinsmaggio.net, "emery_kunze@rogahn.net"

- emails which contain "net".


8. Stream processing: DStream


From official documentation:

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like mapreducejoin and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.


8.1 Stream processing: DStream: TextFileCreator

To simulate stream of data, we will create the simple application which is creating text file every 10 seconds. As a source for this file I will be using again text from ApachSpark wiki. 

package com.demien.sparktest

import java.io.FileWriter
import java.util.Date

import scala.io.Source
import scala.util.Random

object TextFileCreator extends App {

  val listOfLines = Source.fromFile("src/main/resources/sample.txt").getLines.toList
  val rnd = new Random()

  while (true) {

    val fileName = new Date().getTime
    val fullFileName = "data/" + fileName + ".txt"    val fw = new FileWriter(fullFileName, true)
    println("writing to " + fullFileName)

    val linesCount = rnd.nextInt(20) + 5    for (i <- 1 to linesCount) fw.write(listOfLines(rnd.nextInt(100)) + "\n")

    fw.close()
    Thread.sleep(10000)

  }

}



8.2 Stream processing: DStream: Streaming example itself

Our application will be monitoring "data" folder for new files. When new file is received - it will be processed. To simulate some statefull activity we will be using function:  specFunc - the point is to constantly calculate count of words. 


package com.demien.sparktest.stream

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

object DStreamExample extends App {


  val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  val ssc = new StreamingContext(conf, Seconds(10))
  ssc.checkpoint("spark-checkpoint")

  val lines = ssc.textFileStream("data")
  val words = lines.flatMap(_.split(" "))
  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)


  def specFunc = (key: String, value: Option[Int], state: State[Int]) => {
    var newState = state.getOption().getOrElse(0)
    var newValue = value.getOrElse(1)
    newState = newState + newValue
    state.update(newState)
    (key, newValue)
  }

  val spec = StateSpec.function(specFunc).timeout(Seconds(30))

  val wordsMapped = wordCounts.mapWithState(spec)

  // top 10  wordsMapped.stateSnapshots().foreachRDD(rdd => {
    rdd.map(e => (e._2, e._1)).sortByKey(false).take(10).foreach(e => print(e._1, e._2))

  })

  ssc.start() // Start the computation  ssc.awaitTermination() // Wait for the computation to terminate
}

8.3. Stream processing: DStream: Execution

Of course, we have to run both: TextFileCreator and DStreamExample.

TextFileCreator is creating files:
writing to data/1524921644177.txt
writing to data/1524921654262.txt
writing to data/1524921664264.txt
writing to data/1524921674266.txt
writing to data/1524921684267.txt
writing to data/1524921694268.txt
writing to data/1524921704270.txt


And DStreamExample is processing them and counting words:

.....
(17,a)(16,Spark)(15,)(14,the)(10,Apache)(10,of)(7,//)(5,is)(5,can)(5,in)
(17,)(17,a)(17,Spark)(14,the)(10,Apache)(10,of)(7,//)(5,is)(5,can)(5,in)
(25,)(24,the)(21,Spark)(21,a)(19,of)(12,Apache)(10,can)(10,as)(10,and)(9,is)
(35,)(27,the)(25,Spark)(25,a)(20,of)(17,and)(14,Apache)(12,as)(12,//)(10,can)
(45,)(27,the)(26,Spark)(25,a)(20,of)(17,and)(14,Apache)(12,as)(12,//)(10,can)

As you can see, results are similar to what we had in RddExample: most popular words are Spark and Apache.



9. The end. 

As you can see, to try Apache Spark you don't need Hadoop/Yarn - it's possible to run it in a standalone mode without all these compicated things. Source code can be downloaded from here.