In previous post( PySpark getting started) I was explaining how to setup PySpark dev env and some basic operations. This post is some kind of next step: then everything is in place: we have some data and we are about to start some interesting stuff like processing/modeling, but first, we have to prepare our data for this.
Output:
Initial dataset:
+----+---------+----+------+
| id| name| age|salary|
+----+---------+----+------+
| 1| Joe| 34| null|
| 1| Joe| 34| null|
| 2| Huan| 36| 18000|
| 3| Huan| 36| 18000|
| 4| null| 38| 19000|
| 5|Sebastyan| 35| null|
| 7| Anna| 30| 14000|
| 8| null| 24| 11000|
| 9| Jordan|null| 15000|
| 10| null| 28| 13000|
|1001| ExtraLow| 1| 1|
|1002| ExtraBig| 80| 50000|
+----+---------+----+------+
Output:
Distinct:
+----+---------+----+------+
| id| name| age|salary|
+----+---------+----+------+
|1001| ExtraLow| 1| 1|
| 9| Jordan|null| 15000|
| 4| null| 38| 19000|
| 1| Joe| 34| null|
| 3| Huan| 36| 18000|
| 8| null| 24| 11000|
| 2| Huan| 36| 18000|
| 7| Anna| 30| 14000|
|1002| ExtraBig| 80| 50000|
| 10| null| 28| 13000|
| 5|Sebastyan| 35| null|
+----+---------+----+------+
It's better now: we don't have duplicates with ID=1 - just one record left. But we also have 2 rows which have different IDs (2 and 3) but in fact they are the same. Let's try to find duplicates for columns other than ID:
Output:
Columns except [id]: ['name', 'age', 'salary'] . Making distinct by them:
+----+---------+----+------+
| id| name| age|salary|
+----+---------+----+------+
| 3| Huan| 36| 18000|
| 1| Joe| 34| null|
| 10| null| 28| 13000|
| 9| Jordan|null| 15000|
|1001| ExtraLow| 1| 1|
| 7| Anna| 30| 14000|
| 4| null| 38| 19000|
| 8| null| 24| 11000|
| 5|Sebastyan| 35| null|
|1002| ExtraBig| 80| 50000|
+----+---------+----+------+
Good: just records with ID=3 remained (with ID=2 was removed)
Output:
Some basic aggregations:
+-----------+-----------+-------------+
|total_count|distinct_id|distinct_name|
+-----------+-----------+-------------+
| 12| 11| 7|
+-----------+-----------+-------------+
|age_missing|
+-----------+
| 10.0|
+-----------+
- 10% are missing in AGE column.
We can also use more generic way to calculate missing values in several columns in "one shot":
Output:
Percentage of missing values for all columns
+----------+------------+-----------+--------------+
|id_missing|name_missing|age_missing|salary_missing|
+----------+------------+-----------+--------------+
| 0.0| 30.0| 10.0| 20.0|
+----------+------------+-----------+--------------+
Calculated medians(means):
+--------+-----------+
|age_mean|salary_mean|
+--------+-----------+
| 34.0| 17500.125|
+--------+-----------+
Actually this form is good for us, but for PySpark it's better to transform it into "dictionary" format:
# creating dictionary for means
Output:
Dictionary which will be used to fill missing values:
{'age': 34.0, 'salary': 17500.125, 'name': 'Unknown'}
For "name" column we can't use median value so we will be using word "Unknown".
And finally, using dictionary from above we can fill empty values:
Output:
With filled missing values:
+----+---------+---+------+
| id| name|age|salary|
+----+---------+---+------+
| 3| Huan| 36| 18000|
| 1| Joe| 34| 17500|
| 10| Unknown| 28| 13000|
| 9| Jordan| 34| 15000|
|1001| ExtraLow| 1| 1|
| 7| Anna| 30| 14000|
| 4| Unknown| 38| 19000|
| 8| Unknown| 24| 11000|
| 5|Sebastyan| 35| 17500|
|1002| ExtraBig| 80| 50000|
+----+---------+---+------+
Great ! We don't have any empty values! But what about the other values? Some values (with IDs 1001 and 1002) looks too big and we also have some which are too small: outliers. Let's found and remove them. We're interesting here at columns AGE and SALARY. First we have to calculate quantiles: 0.5 - mean median (mean) of dataset , 0.25 - median of subset from the beginning to median, 0,75 - median of subset from median to the end..... In code I'm calculating them all (0.0, 0.25, 0.5, 0.75, 1.0) , bu actually we need just 2 of them: Q1(0.25) and Q3(0.75). Later we have to calculate IQR as difference between them and using IQR we can calculate lowest and highest threshold boundaries. Values are lower than lowest threshold or higher than higher threshold should be removed.
Output:
For age low threshold is 16.0 and high threshold is 48.0
For salary low threshold is 5500.0 and high threshold is 25500.0
Filtered by thresholds:
+---+---------+---+------+
| id| name|age|salary|
+---+---------+---+------+
| 3| Huan| 36| 18000|
| 1| Joe| 34| 17500|
| 10| Unknown| 28| 13000|
| 9| Jordan| 34| 15000|
| 7| Anna| 30| 14000|
| 4| Unknown| 38| 19000|
| 8| Unknown| 24| 11000|
| 5|Sebastyan| 35| 17500|
+---+---------+---+------+
As we wanted values where had extra big/low values were removed.
Just in case full code:
Starting point of my simple program
To start, we should import some dependencies from PySpark library:from pyspark.sql import SparkSession import pyspark.sql.functions as sqlfun if __name__ == "__main__": spark = SparkSession.builder.appName("MyDataPrepareApp").getOrCreate() spark.sparkContext.setLogLevel('WARN')
Test data
Let's generate some simple dataset:print("Initial dataset:") df = spark.createDataFrame([ (1, "Joe", 34, None), (1, "Joe", 34, None), (2, "Huan", 36, 18000), (3, "Huan", 36, 18000), (4, None, 38, 19000), (5, "Sebastyan", 35, None), (7, "Anna", 30, 14000), (8, None, 24, 11000), (9, "Jordan", None, 15000), (10, None, 28, 13000), (1001, "ExtraLow", 1, 1), (1002, "ExtraBig", 80, 50000), ], schema=["id", "name", "age", "salary"]) df.show()
Output:
Initial dataset:
+----+---------+----+------+
| id| name| age|salary|
+----+---------+----+------+
| 1| Joe| 34| null|
| 1| Joe| 34| null|
| 2| Huan| 36| 18000|
| 3| Huan| 36| 18000|
| 4| null| 38| 19000|
| 5|Sebastyan| 35| null|
| 7| Anna| 30| 14000|
| 8| null| 24| 11000|
| 9| Jordan|null| 15000|
| 10| null| 28| 13000|
|1001| ExtraLow| 1| 1|
|1002| ExtraBig| 80| 50000|
+----+---------+----+------+
Duplicates
As you can see there are some(2 records with ID=1) duplicates in our dataset. Let's try to find and remove them:# drop duplicatesdfDistinct = df.dropDuplicates() print("Distinct:") dfDistinct.show()
Output:
Distinct:
+----+---------+----+------+
| id| name| age|salary|
+----+---------+----+------+
|1001| ExtraLow| 1| 1|
| 9| Jordan|null| 15000|
| 4| null| 38| 19000|
| 1| Joe| 34| null|
| 3| Huan| 36| 18000|
| 8| null| 24| 11000|
| 2| Huan| 36| 18000|
| 7| Anna| 30| 14000|
|1002| ExtraBig| 80| 50000|
| 10| null| 28| 13000|
| 5|Sebastyan| 35| null|
+----+---------+----+------+
It's better now: we don't have duplicates with ID=1 - just one record left. But we also have 2 rows which have different IDs (2 and 3) but in fact they are the same. Let's try to find duplicates for columns other than ID:
# drop duplicates other than ID columncolsExceptID = [col for col in df.columns if col != "id"] print("Columns except [id]:", colsExceptID, ". Making distinct by them:") dfDistinctExId = dfDistinct.dropDuplicates(colsExceptID) dfDistinctExId.show()
Output:
Columns except [id]: ['name', 'age', 'salary'] . Making distinct by them:
+----+---------+----+------+
| id| name| age|salary|
+----+---------+----+------+
| 3| Huan| 36| 18000|
| 1| Joe| 34| null|
| 10| null| 28| 13000|
| 9| Jordan|null| 15000|
|1001| ExtraLow| 1| 1|
| 7| Anna| 30| 14000|
| 4| null| 38| 19000|
| 8| null| 24| 11000|
| 5|Sebastyan| 35| null|
|1002| ExtraBig| 80| 50000|
+----+---------+----+------+
Good: just records with ID=3 remained (with ID=2 was removed)
Some basic aggregation
If we need to calculate count of duplicates and distinct records we can use aggregation functions:# some aggregationsprint("Some basic aggregations:") df.agg( sqlfun.count("id").alias("total_count"), sqlfun.countDistinct("id").alias("distinct_id"), sqlfun.countDistinct("name").alias("distinct_name") ).show()
Output:
Some basic aggregations:
+-----------+-----------+-------------+
|total_count|distinct_id|distinct_name|
+-----------+-----------+-------------+
| 12| 11| 7|
+-----------+-----------+-------------+
Missing values
As we can see, some values in dataset are missing. Let's calculate how much records are missed for example in AGE column:# dealing with missed valuesprint("Percentage of missing values for [age] column") dfDistinctExId.agg( (100 - sqlfun.count("age") / sqlfun.count("*") * 100).alias("age_missing") ).show()
Output:
+-----------+|age_missing|
+-----------+
| 10.0|
+-----------+
- 10% are missing in AGE column.
We can also use more generic way to calculate missing values in several columns in "one shot":
print("Percentage of missing values for all columns") dfDistinctExId.agg(*[ # symbol * here instruct spark to use each element of array as separate value (100 - sqlfun.count(col) / sqlfun.count("*") * 100).alias(col + "_missing") for col in dfDistinctExId.columns ]).show()
Output:
Percentage of missing values for all columns
+----------+------------+-----------+--------------+
|id_missing|name_missing|age_missing|salary_missing|
+----------+------------+-----------+--------------+
| 0.0| 30.0| 10.0| 20.0|
+----------+------------+-----------+--------------+
Filling empty values
Now when we identified missing values, we can try to fill them. Most popular approach here is to fill them by column median (mean). Let's calculate median(mean) for every column:# calculate meanmeans = dfDistinctExId.agg(*[ sqlfun.mean(col).alias(col + "_mean") for col in dfDistinctExId.columns if col != "id" and col != "name"]) print("Calculated medians(means):") means.show()
Output:
Calculated medians(means):
+--------+-----------+
|age_mean|salary_mean|
+--------+-----------+
| 34.0| 17500.125|
+--------+-----------+
Actually this form is good for us, but for PySpark it's better to transform it into "dictionary" format:
# creating dictionary for means
meansDict = {} for i in range(len(means.columns)): colName = means.columns[i] colNameForDict = colName.replace("_mean", "") meanValue = means.first()[i] meansDict[colNameForDict] = meanValue meansDict["name"] = "Unknown"print("Dictionary which will be used to fill missing values:") print(meansDict)
Output:
Dictionary which will be used to fill missing values:
{'age': 34.0, 'salary': 17500.125, 'name': 'Unknown'}
For "name" column we can't use median value so we will be using word "Unknown".
And finally, using dictionary from above we can fill empty values:
dfFull = dfDistinctExId.fillna(meansDict) print("With filled missing values:") dfFull.show()
Output:
With filled missing values:
+----+---------+---+------+
| id| name|age|salary|
+----+---------+---+------+
| 3| Huan| 36| 18000|
| 1| Joe| 34| 17500|
| 10| Unknown| 28| 13000|
| 9| Jordan| 34| 15000|
|1001| ExtraLow| 1| 1|
| 7| Anna| 30| 14000|
| 4| Unknown| 38| 19000|
| 8| Unknown| 24| 11000|
| 5|Sebastyan| 35| 17500|
|1002| ExtraBig| 80| 50000|
+----+---------+---+------+
Outliers
Great ! We don't have any empty values! But what about the other values? Some values (with IDs 1001 and 1002) looks too big and we also have some which are too small: outliers. Let's found and remove them. We're interesting here at columns AGE and SALARY. First we have to calculate quantiles: 0.5 - mean median (mean) of dataset , 0.25 - median of subset from the beginning to median, 0,75 - median of subset from median to the end..... In code I'm calculating them all (0.0, 0.25, 0.5, 0.75, 1.0) , bu actually we need just 2 of them: Q1(0.25) and Q3(0.75). Later we have to calculate IQR as difference between them and using IQR we can calculate lowest and highest threshold boundaries. Values are lower than lowest threshold or higher than higher threshold should be removed.# calculate threshold boundaries for several columns(age and salary) and filter datacols = ["age", "salary"] dfFiltered = dfFull for currentCol in cols: qs = dfFull.approxQuantile(currentCol, [0.0, 0.25, 0.5, 0.75, 1.0], 0.05) Q1 = qs[1] Q3 = qs[3] IRQ = Q3 - Q1 thresholdLow = Q1 - 1.5 * IRQ thresholdHigh = Q3 + 1.5 * IRQ print("For", currentCol, "low threshold is", thresholdLow, " and high threshold is", thresholdHigh) dfFiltered = dfFiltered.filter( (sqlfun.col(currentCol) > thresholdLow) & (sqlfun.col(currentCol) < thresholdHigh)) print("Filtered by thresholds:") dfFiltered.show()
Output:
For age low threshold is 16.0 and high threshold is 48.0
For salary low threshold is 5500.0 and high threshold is 25500.0
Filtered by thresholds:
+---+---------+---+------+
| id| name|age|salary|
+---+---------+---+------+
| 3| Huan| 36| 18000|
| 1| Joe| 34| 17500|
| 10| Unknown| 28| 13000|
| 9| Jordan| 34| 15000|
| 7| Anna| 30| 14000|
| 4| Unknown| 38| 19000|
| 8| Unknown| 24| 11000|
| 5|Sebastyan| 35| 17500|
+---+---------+---+------+
As we wanted values where had extra big/low values were removed.
The end
We removed duplicates and outliers, filled empty values - our data are ready for future processing / modeling ! :)Just in case full code:
from pyspark.sql import SparkSession import pyspark.sql.functions as sqlfun if __name__ == "__main__": spark = SparkSession.builder.appName("MyDataPrepareApp").getOrCreate() spark.sparkContext.setLogLevel('WARN') print("Initial dataset:") df = spark.createDataFrame([ (1, "Joe", 34, None), (1, "Joe", 34, None), (2, "Huan", 36, 18000), (3, "Huan", 36, 18000), (4, None, 38, 19000), (5, "Sebastyan", 35, None), (7, "Anna", 30, 14000), (8, None, 24, 11000), (9, "Jordan", None, 15000), (10, None, 28, 13000), (1001, "ExtraLow", 1, 1), (1002, "ExtraBig", 80, 50000), ], schema=["id", "name", "age", "salary"]) df.show() # drop duplicates dfDistinct = df.dropDuplicates() print("Distinct:") dfDistinct.show() # drop duplicates other than ID column colsExceptID = [col for col in df.columns if col != "id"] print("Columns except [id]:", colsExceptID, ". Making distinct by them:") dfDistinctExId = dfDistinct.dropDuplicates(colsExceptID) dfDistinctExId.show() # some aggregations print("Some basic aggregations:") df.agg( sqlfun.count("id").alias("total_count"), sqlfun.countDistinct("id").alias("distinct_id"), sqlfun.countDistinct("name").alias("distinct_name") ).show() # add new id column print("With new created ID column") dfNewId = dfDistinctExId.withColumn("new_id", sqlfun.monotonically_increasing_id()) dfNewId.show() # dealing with missed values print("Percentage of missing values for [age] column") dfDistinctExId.agg( (100 - sqlfun.count("age") / sqlfun.count("*") * 100).alias("age_missing") ).show() print("Percentage of missing values for all columns") dfDistinctExId.agg(*[ # symbol * here instruct spark to use each element of array as separate value (100 - sqlfun.count(col) / sqlfun.count("*") * 100).alias(col + "_missing") for col in dfDistinctExId.columns ]).show() #################################### # filling missing values with MEAN #################################### # calculate mean means = dfDistinctExId.agg(*[ sqlfun.mean(col).alias(col + "_mean") for col in dfDistinctExId.columns if col != "id" and col != "name" ]) print("Calculated medians(means):") means.show() # creating dictionary for means meansDict = {} for i in range(len(means.columns)): colName = means.columns[i] colNameForDict = colName.replace("_mean", "") meanValue = means.first()[i] meansDict[colNameForDict] = meanValue meansDict["name"] = "Unknown" print("Dictionary which will be used to fill missing values:") print(meansDict) # filling empty values using means from dictionary dfFull = dfDistinctExId.fillna(meansDict) print("With filled missing values:") dfFull.show() ########### # outliers ########### # calculate quartiles: Q0, Q1, Q2, Q3, Q4 for [age] column
qs = dfFull.approxQuantile("age", [0.0, 0.25, 0.5, 0.75, 1.0], 0.05)
# last parameter - "accurance" it can be 0 but it will be really expensive to calculate it
print("quartiles: Q0, Q1, Q2, Q3, Q4 for [age] column:", qs) # calculate threshold boundaries for several columns(age and salary) and filter data
cols = ["age", "salary"] dfFiltered = dfFull for currentCol in cols: qs = dfFull.approxQuantile(currentCol, [0.0, 0.25, 0.5, 0.75, 1.0], 0.05) Q1 = qs[1] Q3 = qs[3] IRQ = Q3 - Q1 thresholdLow = Q1 - 1.5 * IRQ thresholdHigh = Q3 + 1.5 * IRQ print("For", currentCol, "low threshold is", thresholdLow, " and high threshold is", thresholdHigh) dfFiltered = dfFiltered.filter( (sqlfun.col(currentCol) > thresholdLow) & (sqlfun.col(currentCol) < thresholdHigh)) print("Filtered by thresholds:")dfFiltered.show()