In this post I'm going to show first steps for working with next components of PySpark: RDDs and DataFrames.
To start working with PySpark we have 2 options:
- python spark-shell from spark distro
- setup dev env by our own
Let's make a closer look on both of them.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Python version 2.7.15+ (default, Oct 7 2019 17:39:04)
SparkSession available as 'spark'.
And we're ready to go! Let's try some basic operations:
>>> rdd1 = spark.parallelize([('Joe',1980), ('Huan',1978), ('Max', 1985) ])
>>> rdd1.take(1)
[('Joe', 1980)]
>>> rdd1.collect()
[('Joe', 1980), ('Huan', 1978), ('Max', 1985)]
It's definitely enough for "playing" with PySpark, but for complex applications it's not an option: we need to setup dev env.
alias python=python3
Let's check:
~$ python --version
Python 3.6.8
For me, after installation pip3 was not working, so I had to modify file /usr/bin/pip3 :
pip 19.2.3 from /home/dmitry/.local/lib/python3.6/site-packages/pip (python 3.6)
~$ sudo pip3 install pipenv
Installing collected packages: pipenv
Successfully installed pipenv-2018.11.26
Next let's create a folder for our test application:
mkdir pyspark-test
Next, we can try to add some dependencies:
(pyspark-test) pyspark-test$ pipenv install pyspark
Installing pyspark…
✔ Success!
Updated Pipfile.lock (1869ad)!
Installing dependencies from Pipfile.lock (1869ad)…
Let's check:
(pyspark-test) pyspark-test$ pip3 freeze
py4j==0.10.7
pyspark==2.4.4
- looks good: all dependencies are in place.
Finally everything is prepared for coding! I created folder "src" and file "main.py" inside with next content:
Let's run it now:
(pyspark-test) pyspark-test$ python ./src/main.py
In shorts:
The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.
In a code provided below I'm showing operations which can performed on RDDs:
- rdd creation: in memory and reading from file
- rdd actions: count, take
- rdd transformations: map, flatMap, filter
p.s. content of "test/test-data/groups.csv" is following:
101,Admin
102,Dev
103,DB
First, I would again suggest to read official documentation: https://spark.apache.org/docs/latest/sql-programming-guide.html.
In shorts, if you know what is RDD - it's very easy to understand what is DataFrame: it'a a RDD + Schema. Where schema is an information about field names and types. If we have structured data like JSON, CSV - we can just read them using spark and it will take the schema from files(in case of CSV - from header).
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
Below I'm showing: how to create dataFrame, some basic operations, how to join them. Most interesting thing here: there are to ways for data transformtion here:
- using Spark API
- using Spark SQL
And that is basically it. Of course it's impossible to show all RDD and DataFrame stuff in one post: it would be a book - so I just tried to show some very basic stuff for understanding "who is who" :)
To start working with PySpark we have 2 options:
- python spark-shell from spark distro
- setup dev env by our own
Let's make a closer look on both of them.
Option 1: Spark-shell
Simplest way to play with pyspark is using python spark-shell. First you have to download Spark from official web page(https://spark.apache.org/downloads.html). Next, unpack it and run "pyspark" from "bin" folder. You should see something like this:Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Python version 2.7.15+ (default, Oct 7 2019 17:39:04)
SparkSession available as 'spark'.
And we're ready to go! Let's try some basic operations:
>>> rdd1 = spark.parallelize([('Joe',1980), ('Huan',1978), ('Max', 1985) ])
[('Joe', 1980)]
>>> rdd1.collect()
[('Joe', 1980), ('Huan', 1978), ('Max', 1985)]
>>> rdd1.collect()[1]
('Huan', 1978)
Option 2: setup PySpark dev env
Python setup
In my ubuntu I have both pythons 2 and 3 installed, but default is 2. To use python 3 I updated bash profile file: ~/.bashrc : I added linealias python=python3
Let's check:
~$ python --version
Python 3.6.8
Great!
Pip3
Next, let's install pip3:
sudo apt-get -y install python3-pipFor me, after installation pip3 was not working, so I had to modify file /usr/bin/pip3 :
#from pip import main
from pip import __main__
if __name__ == '__main__':
# sys.exit(main())
sys.exit(__main__._main())
And let's check it:
~$ pip3 --versionpip 19.2.3 from /home/dmitry/.local/lib/python3.6/site-packages/pip (python 3.6)
It works!
Pipenv
Now let's install pipenv using just installed pip3:~$ sudo pip3 install pipenv
Installing collected packages: pipenv
Successfully installed pipenv-2018.11.26
Dev env
And now we're finally ready to go!
Next let's create a folder for our test application:
mkdir pyspark-test
cd pyspark-test
Now let's create a virtual env using pipenv:
pyspark-test$ pipenv shell
Creating a virtualenv for this project…
✔ Successfully created virtual environment!
(pyspark-test) pyspark-test$ pipenv install pyspark
Installing pyspark…
✔ Success!
Updated Pipfile.lock (1869ad)!
Installing dependencies from Pipfile.lock (1869ad)…
(pyspark-test) pyspark-test$ pip3 freeze
py4j==0.10.7
pyspark==2.4.4
Finally everything is prepared for coding! I created folder "src" and file "main.py" inside with next content:
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.appName("MyTestApp").getOrCreate()
rdd1 = spark.sparkContext.parallelize([('Joe',1980), ('Huan',1978), ('Max', 1985) ])
print("Count: ", rdd1.count())
print("First: ", rdd1.take(1))
Let's run it now:
(pyspark-test) pyspark-test$ python ./src/main.py
Count: 3
First: [('Joe', 1980)]
Cool! Our environment is ready for coding!
RDD
First it's better to read about RDD from official web page: https://spark.apache.org/docs/latest/rdd-programming-guide.htmlIn shorts:
The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.
In a code provided below I'm showing operations which can performed on RDDs:
- rdd creation: in memory and reading from file
- rdd actions: count, take
- rdd transformations: map, flatMap, filter
from pyspark.sql import SparkSession
def main():
# init spark
spark = SparkSession.builder.appName("MyTestApp").getOrCreate()
sc = spark.sparkContext
# create rdd "in-memory"
rdd1 = sc.parallelize([('Joe', 1980), ('Huan', 1978), ('Max', 1985)])
print("RDD1 Count: ", rdd1.count()) #RDD1 Count: 3
print("RDD1 First: ", rdd1.take(1)) #RDD1 First: [('Joe', 1980)]
# create rdd by reading from file
rdd2 = sc.textFile("test/test-data/groups.csv")
print("RDD2 All: ", rdd2.take(10)) #RDD2 All: ['101,Admin', '102,Dev', '103,DB']
# RDD transformation using MAP method
rdd2formatted = rdd2 \
.map(lambda line: line.split(",")) \
.map(lambda arr: (arr[0], arr[1]))
print("RDD2Formatted All: ", rdd2formatted.take(10))
#RDD2Formatted All: [('101', 'Admin'), ('102', 'Dev'), ('103', 'DB')]
# RDD filtering using FILTER method
rdd2filtered = rdd2formatted.filter(lambda row: int(row[0]) > 101)
print("RDD2Filtered All: ", rdd2filtered.take(10))
#RDD2Filtered All: [('102', 'Dev'), ('103', 'DB')]
# Flattening RDD
rdd2FilteredFlat = rdd2filtered.flatMap(lambda row: (row[0], row[1]))
print("RDD2FilteredFlat All: ", rdd2FilteredFlat.take(10))
#RDD2FilteredFlat All: ['102', 'Dev', '103', 'DB']
if __name__ == '__main__':
main()
101,Admin
102,Dev
103,DB
DataFrame
First, I would again suggest to read official documentation: https://spark.apache.org/docs/latest/sql-programming-guide.html.In shorts, if you know what is RDD - it's very easy to understand what is DataFrame: it'a a RDD + Schema. Where schema is an information about field names and types. If we have structured data like JSON, CSV - we can just read them using spark and it will take the schema from files(in case of CSV - from header).
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
Below I'm showing: how to create dataFrame, some basic operations, how to join them. Most interesting thing here: there are to ways for data transformtion here:
- using Spark API
- using Spark SQL
from pyspark.sql import SparkSession
def main():
# init spark
spark = SparkSession.builder.appName("MyTestApp").getOrCreate()
sc = spark.sparkContext
# create DataFrame from RDD
rdd1 = sc.parallelize((
"""{"id": "1001", "name": "Joe", "depId": 101}""",
"""{"id": "1002", "name": "Huan", "depId": 102}""",
"""{"id": "1003", "name": "Max", "depId": 103}"""
))
print(rdd1.take(2))
# ['{"id": "1001", "name": "Joe", "depId": 101}', '{"id": "1002", "name": "Huan", "depId": 102}']
df1 = spark.read.json(rdd1)
df1.show()
# +-----+----+----+
# |depId| id|name|
# +-----+----+----+
# | 101|1001| Joe|
# | 102|1002|Huan|
# | 103|1003| Max|
# +-----+----+----+
# Basic operations using Spark API
df1.select("name", "depId").where("id='1001'").show()
#+----+-----+
#|name|depId|
#+----+-----+
#| Joe| 101|
#+----+-----+
# Basic operation using Spark SQL
df1.createOrReplaceTempView("users")
spark.sql("SELECT concat(name,' has id>1001') as user_name FROM users WHERE id>1001").show()
#+----------------+
#| user_name|
#+----------------+
#|Huan has id>1001|
#| Max has id>1001|
#+----------------+
# create DataFrame by reading from file
df2 = spark.read.option("header",True).csv("test/test-data/depts.csv")
df2.show()
#+-----+-------+
#|depId|depName|
#+-----+-------+
#| 101| Admin|
#| 102| Dev|
#| 103| DB|
#+-----+-------+
# join 2 DataFrames using Spark API
joined1 = df1.join(df2, df1.depId==df2.depId).drop(df2.depId)
joined1.show()
#+-----+----+----+-------+
#|depId| id|name|depName|
#+-----+----+----+-------+
#| 101|1001| Joe| Admin|
#| 102|1002|Huan| Dev|
#| 103|1003| Max| DB|
#+-----+----+----+-------+
# join 2 DataFrames using Spark SQL
df2.createOrReplaceTempView("deps")
joined2 = spark.sql("SELECT u.id, u.name, d.depName FROM users u, deps d WHERE u.depId = d.depId")
joined2.show()
#+----+----+-------+
#| id|name|depName|
#+----+----+-------+
#|1001| Joe| Admin|
#|1002|Huan| Dev|
#|1003| Max| DB|
#+----+----+-------+
if __name__ == '__main__':
main()