Wednesday, October 16, 2019

PySpark getting started

In this post I'm going to show first steps for working with next components of PySpark:  RDDs and DataFrames.

To start working with PySpark we have 2 options:
-  python spark-shell from spark distro
-  setup dev env by our own

Let's make a closer look on both of them.

Option 1: Spark-shell

Simplest way to play with pyspark is using python spark-shell. First you have to download Spark from official web page(https://spark.apache.org/downloads.html). Next, unpack it and run "pyspark" from "bin" folder. You should see something like this:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Python version 2.7.15+ (default, Oct  7 2019 17:39:04)
SparkSession available as 'spark'.


And we're ready to go! Let's try some basic operations:


>>> rdd1 = spark.parallelize([('Joe',1980), ('Huan',1978), ('Max', 1985) ])

>>> rdd1.take(1)
[('Joe', 1980)]

>>> rdd1.collect()
[('Joe', 1980), ('Huan', 1978), ('Max', 1985)]

>>> rdd1.collect()[1]
('Huan', 1978)

It's definitely enough for "playing" with PySpark, but for complex applications it's not an option: we need to setup dev env.

Option 2: setup PySpark dev env


Python setup

In my ubuntu I have both pythons 2 and 3 installed, but default is 2. To use python 3 I updated bash profile file:   ~/.bashrc : I added line
alias python=python3

Let's check:
~$ python --version
Python 3.6.8

Great! 

Pip3 

Next, let's install pip3: 
sudo apt-get -y install python3-pip

For me, after installation pip3 was not working, so I had to modify file /usr/bin/pip3 :

#from pip import main
from pip import __main__
if __name__ == '__main__':
#    sys.exit(main())
    sys.exit(__main__._main())

And let's check it: 
~$ pip3 --version
pip 19.2.3 from /home/dmitry/.local/lib/python3.6/site-packages/pip (python 3.6)

It works! 

Pipenv

Now let's install pipenv using just installed pip3:
~$ sudo pip3 install pipenv
Installing collected packages: pipenv
Successfully installed pipenv-2018.11.26

Dev env

And now we're finally ready to go! 

Next let's create a folder for our test application:

mkdir pyspark-test
cd pyspark-test

Now let's create a virtual env using pipenv: 
pyspark-test$ pipenv shell
Creating a virtualenv for this project…
✔ Successfully created virtual environment! 


Next, we can try to add some dependencies:
(pyspark-test) pyspark-test$ pipenv install pyspark
Installing pyspark…
✔ Success! 
Updated Pipfile.lock (1869ad)!
Installing dependencies from Pipfile.lock (1869ad)…

Let's check:
(pyspark-test) pyspark-test$ pip3 freeze
py4j==0.10.7
pyspark==2.4.4

- looks good: all dependencies are in place.

Finally everything is prepared for coding! I created folder "src" and file "main.py" inside with next content:

from pyspark.sql import SparkSession

if __name__ == '__main__':
        spark = SparkSession.builder.appName("MyTestApp").getOrCreate()

        rdd1 = spark.sparkContext.parallelize([('Joe',1980), ('Huan',1978), ('Max'1985) ])
        print("Count: ", rdd1.count())
        print("First: ", rdd1.take(1))




Let's run it now:

(pyspark-test) pyspark-test$ python ./src/main.py 

Count:  3                                                                       
First:  [('Joe', 1980)]

Cool! Our environment is ready for coding! 


RDD

First it's better to read about RDD from official web page: https://spark.apache.org/docs/latest/rdd-programming-guide.html

In shorts:
The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.



In a code provided below I'm showing operations which can performed on RDDs:
- rdd creation: in memory and reading from file
- rdd actions: count, take
- rdd transformations: map, flatMap, filter


from pyspark.sql import SparkSession

def main():
    # init spark
    spark = SparkSession.builder.appName("MyTestApp").getOrCreate()
    sc = spark.sparkContext

    # create rdd "in-memory"
    rdd1 = sc.parallelize([('Joe'1980), ('Huan'1978), ('Max'1985)])
    print("RDD1 Count: ", rdd1.count()) #RDD1 Count:  3 
    print("RDD1 First: ", rdd1.take(1)) #RDD1 First:  [('Joe', 1980)]

    # create rdd by reading from file
    rdd2 = sc.textFile("test/test-data/groups.csv"
    print("RDD2 All: ", rdd2.take(10)) #RDD2 All:  ['101,Admin', '102,Dev', '103,DB']

    # RDD transformation using MAP method
    rdd2formatted = rdd2 \
        .map(lambda line: line.split(",")) \
        .map(lambda arr: (arr[0], arr[1]))
    print("RDD2Formatted All: ", rdd2formatted.take(10)) 
    #RDD2Formatted All:  [('101', 'Admin'), ('102', 'Dev'), ('103', 'DB')]

    # RDD filtering using FILTER method
    rdd2filtered = rdd2formatted.filter(lambda row: int(row[0]) > 101)
    print("RDD2Filtered All: ", rdd2filtered.take(10))
    #RDD2Filtered All:  [('102', 'Dev'), ('103', 'DB')]

    # Flattening RDD
    rdd2FilteredFlat = rdd2filtered.flatMap(lambda row: (row[0], row[1]))
    print("RDD2FilteredFlat All: ", rdd2FilteredFlat.take(10))
    #RDD2FilteredFlat All:  ['102', 'Dev', '103', 'DB']


if __name__ == '__main__':
    main()

p.s. content of "test/test-data/groups.csv" is following:
101,Admin
102,Dev
103,DB



DataFrame

First, I would again suggest to read official documentation: https://spark.apache.org/docs/latest/sql-programming-guide.html.
In shorts, if you know what is RDD -  it's very easy to understand what is DataFrame: it'a a RDD + Schema. Where schema is an information about field names and types. If we have structured data like JSON, CSV - we can just read them using spark and it will take the schema from files(in case of CSV - from header).

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. 


Below I'm showing: how to create dataFrame, some basic operations, how to join them. Most interesting thing here: there are to ways for data transformtion here:
- using Spark API
- using Spark SQL

from pyspark.sql import SparkSession

def main():
    # init spark
    spark = SparkSession.builder.appName("MyTestApp").getOrCreate()
    sc = spark.sparkContext

    # create DataFrame from RDD
    rdd1 = sc.parallelize((
    """{"id": "1001", "name": "Joe",  "depId": 101}""",
    """{"id": "1002", "name": "Huan", "depId": 102}"""
    """{"id": "1003", "name": "Max",  "depId": 103}"""
    ))
    print(rdd1.take(2)) 
    # ['{"id": "1001", "name": "Joe",  "depId": 101}', '{"id": "1002", "name": "Huan", "depId": 102}']    
    df1 = spark.read.json(rdd1)
    df1.show()
    # +-----+----+----+
    # |depId|  id|name|
    # +-----+----+----+
    # |  101|1001| Joe|
    # |  102|1002|Huan|
    # |  103|1003| Max|
    # +-----+----+----+

    # Basic operations using Spark API
    df1.select("name""depId").where("id='1001'").show()
    #+----+-----+
    #|name|depId|
    #+----+-----+
    #| Joe|  101|
    #+----+-----+    

    # Basic operation using Spark SQL
    df1.createOrReplaceTempView("users")
    spark.sql("SELECT concat(name,' has id>1001') as user_name FROM users WHERE id>1001").show()
    #+----------------+
    #|       user_name|
    #+----------------+
    #|Huan has id>1001|
    #| Max has id>1001|
    #+----------------+

    # create DataFrame by reading from file
    df2 = spark.read.option("header",True).csv("test/test-data/depts.csv")
    df2.show()
    #+-----+-------+
    #|depId|depName|
    #+-----+-------+
    #|  101|  Admin|
    #|  102|    Dev|
    #|  103|     DB|
    #+-----+-------+

    # join 2 DataFrames using Spark API
    joined1 = df1.join(df2, df1.depId==df2.depId).drop(df2.depId)
    joined1.show()
    #+-----+----+----+-------+
    #|depId|  id|name|depName|
    #+-----+----+----+-------+
    #|  101|1001| Joe|  Admin|
    #|  102|1002|Huan|    Dev|
    #|  103|1003| Max|     DB|
    #+-----+----+----+-------+    

    # join 2 DataFrames using Spark SQL
    df2.createOrReplaceTempView("deps")
    joined2 = spark.sql("SELECT u.id, u.name, d.depName FROM users u, deps d WHERE u.depId = d.depId")
    joined2.show()
    #+----+----+-------+
    #|  id|name|depName|
    #+----+----+-------+
    #|1001| Joe|  Admin|
    #|1002|Huan|    Dev|
    #|1003| Max|     DB|
    #+----+----+-------+





if __name__ == '__main__':
    main()





The end

And that is basically it. Of course it's impossible to show all RDD and DataFrame stuff in one post: it would be a book - so I just tried to show some very basic stuff for understanding "who is who" :)