Monday, February 3, 2020

PySpark data preparation

In previous post( PySpark getting started) I was explaining how to setup PySpark dev env and some basic operations. This post is some kind of next step: then everything is in place: we have some data and we are about to start some interesting stuff like processing/modeling, but first, we have to prepare our data for this.


Starting point of my simple program

To start, we should import some dependencies from PySpark library:

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfun

if __name__ == "__main__":
    spark = SparkSession.builder.appName("MyDataPrepareApp").getOrCreate()
    spark.sparkContext.setLogLevel('WARN')



Test data 

Let's generate some simple dataset:

print("Initial dataset:")
df = spark.createDataFrame([
    (1, "Joe", 34, None),
    (1, "Joe", 34, None),
    (2, "Huan", 36, 18000),
    (3, "Huan", 36, 18000),
    (4, None, 38, 19000),
    (5, "Sebastyan", 35, None),
    (7, "Anna", 30, 14000),
    (8, None, 24, 11000),
    (9, "Jordan", None, 15000),
    (10, None, 28, 13000),

    (1001, "ExtraLow", 1, 1),
    (1002, "ExtraBig", 80, 50000),
], schema=["id", "name", "age", "salary"])
df.show()



Output: 

Initial dataset:
+----+---------+----+------+
|  id|     name| age|salary|
+----+---------+----+------+
|   1|      Joe|  34|  null|
|   1|      Joe|  34|  null|
|   2|     Huan|  36| 18000|
|   3|     Huan|  36| 18000|
|   4|     null|  38| 19000|
|   5|Sebastyan|  35|  null|
|   7|     Anna|  30| 14000|
|   8|     null|  24| 11000|
|   9|   Jordan|null| 15000|
|  10|     null|  28| 13000|
|1001| ExtraLow|   1|     1|
|1002| ExtraBig|  80| 50000|
+----+---------+----+------+



Duplicates 

As you can see there are some(2 records with ID=1) duplicates in our dataset. Let's try to find and remove them:

# drop duplicatesdfDistinct = df.dropDuplicates()
print("Distinct:")
dfDistinct.show()



Output: 

Distinct:
+----+---------+----+------+
|  id|     name| age|salary|
+----+---------+----+------+
|1001| ExtraLow|   1|     1|
|   9|   Jordan|null| 15000|
|   4|     null|  38| 19000|
|   1|      Joe|  34|  null|
|   3|     Huan|  36| 18000|
|   8|     null|  24| 11000|
|   2|     Huan|  36| 18000|
|   7|     Anna|  30| 14000|
|1002| ExtraBig|  80| 50000|
|  10|     null|  28| 13000|
|   5|Sebastyan|  35|  null|
+----+---------+----+------+



It's better now: we don't have duplicates with ID=1 - just one record left. But we also have 2 rows which have different IDs (2 and 3) but in fact they are the same. Let's try to find duplicates for columns other than ID:


# drop duplicates other than ID columncolsExceptID = [col for col in df.columns if col != "id"]
print("Columns except [id]:", colsExceptID, ". Making distinct by them:")
dfDistinctExId = dfDistinct.dropDuplicates(colsExceptID)
dfDistinctExId.show()


Output:
Columns except [id]: ['name', 'age', 'salary'] . Making distinct by them:
+----+---------+----+------+
|  id|     name| age|salary|
+----+---------+----+------+
|   3|     Huan|  36| 18000|
|   1|      Joe|  34|  null|
|  10|     null|  28| 13000|
|   9|   Jordan|null| 15000|
|1001| ExtraLow|   1|     1|
|   7|     Anna|  30| 14000|
|   4|     null|  38| 19000|
|   8|     null|  24| 11000|
|   5|Sebastyan|  35|  null|
|1002| ExtraBig|  80| 50000|
+----+---------+----+------+

Good: just records with ID=3 remained (with ID=2 was removed)

Some basic aggregation

If we need to calculate count of duplicates and distinct records we can use aggregation functions:

# some aggregationsprint("Some basic aggregations:")
df.agg(
    sqlfun.count("id").alias("total_count"),
    sqlfun.countDistinct("id").alias("distinct_id"),
    sqlfun.countDistinct("name").alias("distinct_name")
).show()

Output: 

Some basic aggregations:
+-----------+-----------+-------------+
|total_count|distinct_id|distinct_name|
+-----------+-----------+-------------+
|         12|         11|            7|
+-----------+-----------+-------------+




Missing values

As we can see, some values in dataset are missing. Let's calculate how much records are missed for example  in AGE column:

# dealing with missed valuesprint("Percentage of missing values for [age] column")
dfDistinctExId.agg(
    (100 - sqlfun.count("age") / sqlfun.count("*") * 100).alias("age_missing")
).show()

Output:

+-----------+
|age_missing|
+-----------+
|       10.0|
+-----------+

- 10% are missing in AGE column.


We can also use more generic way to calculate missing values in several columns in "one shot":

print("Percentage of missing values for all columns")
dfDistinctExId.agg(*[  # symbol * here instruct spark to use each element of array as separate value    (100 - sqlfun.count(col) / sqlfun.count("*") * 100).alias(col + "_missing")
    for col in dfDistinctExId.columns
]).show()

Output: 

Percentage of missing values for all columns
+----------+------------+-----------+--------------+
|id_missing|name_missing|age_missing|salary_missing|
+----------+------------+-----------+--------------+
|       0.0|        30.0|       10.0|          20.0|
+----------+------------+-----------+--------------+


Filling empty values

Now when we identified missing values, we can try to fill them. Most popular approach here is to fill them by column median (mean). Let's calculate median(mean) for every column:

# calculate meanmeans = dfDistinctExId.agg(*[
    sqlfun.mean(col).alias(col + "_mean")
    for col in dfDistinctExId.columns if col != "id" and col != "name"])
print("Calculated medians(means):")
means.show()



Output: 

Calculated medians(means):
+--------+-----------+
|age_mean|salary_mean|
+--------+-----------+
|    34.0|  17500.125|
+--------+-----------+

Actually this form is good for us, but for PySpark it's better to transform it into "dictionary" format:

# creating dictionary for means
meansDict = {}
for i in range(len(means.columns)):
    colName = means.columns[i]
    colNameForDict = colName.replace("_mean", "")
    meanValue = means.first()[i]
    meansDict[colNameForDict] = meanValue
meansDict["name"] = "Unknown"print("Dictionary which will be used to fill missing values:")
print(meansDict)


Output:

Dictionary which will be used to fill missing values:
{'age': 34.0, 'salary': 17500.125, 'name': 'Unknown'}



For "name" column we can't use median value so we will be using word "Unknown".

And finally, using dictionary from above we can fill empty values:

dfFull = dfDistinctExId.fillna(meansDict)
print("With filled missing values:")
dfFull.show()


Output:

With filled missing values:
+----+---------+---+------+
|  id|     name|age|salary|
+----+---------+---+------+
|   3|     Huan| 36| 18000|
|   1|      Joe| 34| 17500|
|  10|  Unknown| 28| 13000|
|   9|   Jordan| 34| 15000|
|1001| ExtraLow|  1|     1|
|   7|     Anna| 30| 14000|
|   4|  Unknown| 38| 19000|
|   8|  Unknown| 24| 11000|
|   5|Sebastyan| 35| 17500|
|1002| ExtraBig| 80| 50000|
+----+---------+---+------+



Outliers

Great ! We don't have any empty values! But what about the other values? Some values (with IDs 1001 and 1002) looks too big and we also have some which are too small: outliers. Let's found and remove them. We're interesting here at columns AGE and SALARY. First we have to calculate quantiles: 0.5 - mean median (mean) of dataset , 0.25 - median of subset from the beginning to median, 0,75 - median of subset from median to the end..... In code I'm calculating them all (0.0, 0.25, 0.5, 0.75, 1.0) ,  bu actually we need just 2 of them: Q1(0.25) and Q3(0.75). Later we have to calculate IQR as difference between them and using IQR we can calculate lowest and highest threshold boundaries. Values are lower than lowest threshold or higher than higher threshold should be removed.



# calculate threshold boundaries for several columns(age and salary) and filter datacols = ["age", "salary"]

dfFiltered = dfFull
for currentCol in cols:
    qs = dfFull.approxQuantile(currentCol, [0.0, 0.25, 0.5, 0.75, 1.0], 0.05)
    Q1 = qs[1]
    Q3 = qs[3]
    IRQ = Q3 - Q1
    thresholdLow = Q1 - 1.5 * IRQ
    thresholdHigh = Q3 + 1.5 * IRQ
    print("For", currentCol, "low threshold is", thresholdLow, " and high threshold is", thresholdHigh)
    dfFiltered = dfFiltered.filter(
        (sqlfun.col(currentCol) > thresholdLow) & (sqlfun.col(currentCol) < thresholdHigh))

print("Filtered by thresholds:")
dfFiltered.show()

Output: 

For age low threshold is 16.0  and high threshold is 48.0
For salary low threshold is 5500.0  and high threshold is 25500.0
Filtered by thresholds:
+---+---------+---+------+
| id|     name|age|salary|
+---+---------+---+------+
|  3|     Huan| 36| 18000|
|  1|      Joe| 34| 17500|
| 10|  Unknown| 28| 13000|
|  9|   Jordan| 34| 15000|
|  7|     Anna| 30| 14000|
|  4|  Unknown| 38| 19000|
|  8|  Unknown| 24| 11000|
|  5|Sebastyan| 35| 17500|
+---+---------+---+------+


As we wanted values where had extra big/low values were removed.

The end

We removed duplicates and outliers, filled empty values - our data are ready for future processing / modeling ! :)

Just in case full code:

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfun

if __name__ == "__main__":
    spark = SparkSession.builder.appName("MyDataPrepareApp").getOrCreate()
    spark.sparkContext.setLogLevel('WARN')

    print("Initial dataset:")
    df = spark.createDataFrame([
        (1, "Joe", 34, None),
        (1, "Joe", 34, None),
        (2, "Huan", 36, 18000),
        (3, "Huan", 36, 18000),
        (4, None, 38, 19000),
        (5, "Sebastyan", 35, None),
        (7, "Anna", 30, 14000),
        (8, None, 24, 11000),
        (9, "Jordan", None, 15000),
        (10, None, 28, 13000),

        (1001, "ExtraLow", 1, 1),
        (1002, "ExtraBig", 80, 50000),
    ], schema=["id", "name", "age", "salary"])
    df.show()


    # drop duplicates    dfDistinct = df.dropDuplicates()
    print("Distinct:")
    dfDistinct.show()

    # drop duplicates other than ID column    colsExceptID = [col for col in df.columns if col != "id"]
    print("Columns except [id]:", colsExceptID, ". Making distinct by them:")
    dfDistinctExId = dfDistinct.dropDuplicates(colsExceptID)
    dfDistinctExId.show()

    # some aggregations    print("Some basic aggregations:")
    df.agg(
        sqlfun.count("id").alias("total_count"),
        sqlfun.countDistinct("id").alias("distinct_id"),
        sqlfun.countDistinct("name").alias("distinct_name")
    ).show()

    # add new id column    print("With new created ID column")
    dfNewId = dfDistinctExId.withColumn("new_id", sqlfun.monotonically_increasing_id())
    dfNewId.show()

    # dealing with missed values    print("Percentage of missing values for [age] column")
    dfDistinctExId.agg(
        (100 - sqlfun.count("age") / sqlfun.count("*") * 100).alias("age_missing")
    ).show()

    print("Percentage of missing values for all columns")
    dfDistinctExId.agg(*[  # symbol * here instruct spark to use each element of array as separate value        (100 - sqlfun.count(col) / sqlfun.count("*") * 100).alias(col + "_missing")
        for col in dfDistinctExId.columns
    ]).show()

    ####################################    # filling missing values with MEAN    ####################################
    # calculate mean    means = dfDistinctExId.agg(*[
        sqlfun.mean(col).alias(col + "_mean")
        for col in dfDistinctExId.columns if col != "id" and col != "name"    ])
    print("Calculated medians(means):")
    means.show()

    # creating dictionary for means    meansDict = {}
    for i in range(len(means.columns)):
        colName = means.columns[i]
        colNameForDict = colName.replace("_mean", "")
        meanValue = means.first()[i]
        meansDict[colNameForDict] = meanValue
    meansDict["name"] = "Unknown"    print("Dictionary which will be used to fill missing values:")
    print(meansDict)

    # filling empty values using means from dictionary    dfFull = dfDistinctExId.fillna(meansDict)
    print("With filled missing values:")
    dfFull.show()

    ###########    # outliers    ###########
    # calculate quartiles: Q0, Q1, Q2, Q3, Q4 for [age] column    
    qs = dfFull.approxQuantile("age", [0.0, 0.25, 0.5, 0.75, 1.0], 0.05)  
    # last parameter - "accurance" it can be 0 but it will be really expensive to calculate it    
    print("quartiles: Q0, Q1, Q2, Q3, Q4 for [age] column:", qs)

    # calculate threshold boundaries for several columns(age and salary) and filter data    
    cols = ["age", "salary"]

    dfFiltered = dfFull
    for currentCol in cols:
        qs = dfFull.approxQuantile(currentCol, [0.0, 0.25, 0.5, 0.75, 1.0], 0.05)
        Q1 = qs[1]
        Q3 = qs[3]
        IRQ = Q3 - Q1
        thresholdLow = Q1 - 1.5 * IRQ
        thresholdHigh = Q3 + 1.5 * IRQ
        print("For", currentCol, "low threshold is", thresholdLow, " and high threshold is", thresholdHigh)
        dfFiltered = dfFiltered.filter(
            (sqlfun.col(currentCol) > thresholdLow) & (sqlfun.col(currentCol) < thresholdHigh))

    print("Filtered by thresholds:")
    dfFiltered.show()