Monday, November 18, 2019

Azure Databricks: RDDs, Data Frames and Datasets, Part 4

Today, we're going to continue talking about RDDs, Data Frames and Datasets in Azure Databricks.  If you haven't read the previous posts in this series, IntroductionCluser CreationNotebooksDatabricks File System (DBFS)Hive (SQL) Database and RDDs, Data Frames and Dataset (Part 1Part 2Part 3), they may provide some useful context.  If you are new to this material, it is especially important that you read through the previous posts (Part 1Part 2, Part 3), as this post will build directly on what was discussed during the previous post.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, RDDs, Data Frames and Datasets.

Over the previous posts in this series (Part 1Part 2Part 3), we've written a number of code snippets that perform common data transformation tasks using RDDs, Data Frames and Datasets.  In this post, we're going to crank up the size of the data to see the performance differences between these methods.  Here's the code we used to create our new objects.

<CODE START>

for( i <- 1 to 1000){
  dbutils.fs.cp("/databricks-datasets/adult/adult.data", "/adultbig/adult" + i + ".data")
}

<CODE END>

<CODE START>

import org.apache.spark.sql.functions._

val rdd_lines = sc.textFile("/adultbig/")

val rdd_adult = rdd_lines
  .filter(f => f != "")
  .map(l => l.split(", "))

val df_adult_names = sqlContext.read
  .option("inferSchema", "true")
  .csv("/adultbig/").toDF(
    "age"
    ,"workclass"
    ,"fnlwgt"
    ,"education"
    ,"educationnum"
    ,"maritalstatus"
    ,"occupation"
    ,"relationship"
    ,"race"
    ,"sex"
    ,"capitalgain"
    ,"capitalloss"
    ,"hoursperweek"
    ,"nativecountry"
    ,"income"
  )

case class Adult (
  age: Int
  ,workclass: String
  ,fnlwgt: Double
  ,education: String
  ,educationnum: Double
  ,maritalstatus: String
  ,occupation: String
  ,relationship: String
  ,race: String
  ,sex: String
  ,capitalgain: Double
  ,capitalloss: Double
  ,hoursperweek: Double
  ,nativecountry: String
  ,income: String
)


val ds_adult = df_adult_names.as[Adult]

<CODE END>

Here's a copy of the code that we are running to test our objects.  Due to Spark's lazy evaluation framework, we have to use the .count function at the end of every command for force evaluation.  Since this is quite a few code segments, we'll use code blocks only to separate RDDs, Data Frames and Datasets, but break the individual code segments up by comment lines for clarity.

<CODE START>

rdd_adult.count

//

val rdd_select = rdd_adult
  .map(x => (x(0), x(14)))
rdd_select.count

//

val rdd_col = rdd_adult
  .map(x => (x(0), x(0).toInt * 12))
rdd_col.count

//

val rdd_fil = rdd_adult
  .filter(x => x(14) == "<=50K")
rdd_fil.count

//

val rdd_sum = rdd_adult
  .map(x => x(10).toDouble)
  .reduce((a, b) => a + b)
rdd_sum

//

val rdd_avg_tup = rdd_adult
  .map(x => x(0).toInt)
  .aggregate((0, 0))(
    (w, age) => (w._1 + age, w._2 + 1)
    ,(a, b) => (a._1 + b._1, a._2 + b._2)
  )
val rdd_avg = rdd_avg_tup._1.toDouble / rdd_avg_tup._2
rdd_avg

//

val rdd_aapi = rdd_adult
  .map(x => (x(14), x(0).toInt))
  .aggregateByKey((0, 0))(
    (w, age) => (w._1 + age, w._2 + 1)
    ,(a, b) => (a._1 + b._1, a._2 + b._2)
  )
  .map(x => (x._1, x._2._1.toDouble / x._2._2))
rdd_aapi.count

//

val rdd_lines_100 = sc.textFile("/adult100/")

val rdd_adult_100 = rdd_lines_100
  .filter(f => f != "")
  .map(l => l.split(", "))

val rdd_aapi_100 = rdd_adult_100
  .map(x => (x(14), x(0).toInt))
  .aggregateByKey((0, 0))(
    (w, age) => (w._1 + age, w._2 + 1)
    ,(a, b) => (a._1 + b._1, a._2 + b._2)
  )
  .map(x => (x._1, x._2._1.toDouble / x._2._2))

//

val rdd_join_l = rdd_adult_100
  .map(x => (x(14), x))

val rdd_join_r = rdd_aapi_100
  .map(x => (x._1, x))

val rdd_join = rdd_join_l
  .join(rdd_join_r)
  .map(x => (
    x._2._1(0)
    ,x._2._1(1)
    ,x._2._1(2)
    ,x._2._1(3)
    ,x._2._1(4)
    ,x._2._1(5)
    ,x._2._1(6)
    ,x._2._1(7)
    ,x._2._1(8)
    ,x._2._1(9)
    ,x._2._1(10)
    ,x._2._1(11)
    ,x._2._1(12)
    ,x._2._1(13)
    ,x._2._1(14)
    ,x._2._2._2
  ))
rdd_join.count

//

val rdd_sort = rdd_adult
  .sortBy(x => x(0))
rdd_sort.count

//

val rdd_sort2 = rdd_adult
  .sortBy(x => (x(0), x(1)))
rdd_sort2.count

<CODE END>

<CODE START>

df_adult_names.count

//

val df_select = df_adult_names
  .select("age", "income")
df_select.count

//

val df_col = df_adult_names
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")
df_col.count

//

val df_fil = df_adult_names
  .filter(ltrim($"income") === "<=50K")
df_fil.count

//

val df_sum = df_adult_names
  .agg(sum("capitalgain"))
  .first()
  .getDouble(0)
df_sum

//

val df_avg = df_adult_names
  .agg(avg("age"))
  .first()
  .getDouble(0)
df_avg

//

val df_aapi = df_adult_names
  .groupBy("income")
  .agg(avg("age"))
df_aapi.count

//

val df_adult_100 = sqlContext.read
  .option("inferSchema", "true")
  .csv("/adult100/").toDF(
    "age"
    ,"workclass"
    ,"fnlwgt"
    ,"education"
    ,"educationnum"
    ,"maritalstatus"
    ,"occupation"
    ,"relationship"
    ,"race"
    ,"sex"
    ,"capitalgain"
    ,"capitalloss"
    ,"hoursperweek"
    ,"nativecountry"
    ,"income"
  )

val df_aapi_100 = df_adult_100
  .groupBy("income")
  .agg(avg("age"))

//

val df_join_r = df_aapi_100
  .withColumnRenamed("income", "rincome")

val df_join = df_adult_100
  .join(
    df_join_r
    ,df_adult_100("income") === df_join_r("rincome")
    ,"inner"
  )
  .drop("rincome")
df_join.count

//

val df_sort = df_adult_names
  .orderBy("age")
df_sort.count

//

val df_sort2 = df_adult_names
  .orderBy("age", "workclass")
df_sort2.count

<CODE END>

<CODE START>

ds_adult.count

//

case class Adult_select (
  age: Int
  ,income: String
)

val ds_select = ds_adult
  .select("age", "income")
  .as[Adult_select]
ds_select.count

//

case class Adult_col (
  age: Int
  ,ageinmonths: Int
)

val ds_col = ds_adult
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")
  .as[Adult_col]
ds_col.count

//

val ds_fil = ds_adult
  .filter(ltrim($"income") === "<=50K")
ds_fil.count

//

val ds_sum = ds_adult
  .agg(sum("capitalgain"))
  .first()
  .getDouble(0)
ds_sum

//

val ds_avg = ds_adult
  .agg(avg("age"))
  .first()
  .getDouble(0)
ds_avg

//

case class Adult_aapi (
  income: String
  ,avgage: Double
)

val ds_aapi = ds_adult
  .groupBy("income")
  .agg(avg("age"))
  .withColumnRenamed("avg(age)", "avgage")
  .as[Adult_aapi]
ds_aapi.count

//

val ds_adult_100 = df_adult_100.as[Adult]

val ds_aapi_100 = ds_adult_100
  .groupBy("income")
  .agg(avg("age"))
  .withColumnRenamed("avg(age)", "avgage")
  .as[Adult_aapi]

//

case class Adult_join_r (
  rincome: String
  ,avgage: Double
)

val ds_join_r = ds_aapi_100
  .withColumnRenamed("income", "rincome")
  .as[Adult_join_r]

case class Adult_join (
  age: Int
  ,workclass: String
  ,fnlwgt: Double
  ,education: String
  ,educationnum: Double
  ,maritalstatus: String
  ,occupation: String
  ,relationship: String
  ,race: String
  ,sex: String
  ,capitalgain: Double
  ,capitalloss: Double
  ,hoursperweek: Double
  ,nativecountry: String
  ,income: String
  ,avgage: Double
)

val ds_join = ds_adult_100
  .join(
    ds_join_r
    ,ds_adult_100("income") === ds_join_r("rincome")
    ,"inner"
  )
  .drop("rincome")
  .as[Adult_join]
ds_join.count

//

val ds_sort = ds_adult
  .orderBy("age")
ds_sort.count

//

val ds_sort2 = ds_adult
  .orderBy("age", "workclass")
ds_sort2.count

<CODE END>

While performing this test, we found that the RDD join was continously failing due to memory errors.  While this was likely due to the fact that the RDDs were using strings, while the Data Frames and Datasets were using numeric types, we wanted to keep everything the same as we had seen previously.  So, we created a second set of objects that only used 10% of the total records to allow the RDD join to function.  Here are the results from the test.

Task RDD Data Frame Dataset
Count 17.3 14.2 16.4
Select 16.4 13.4 13.8
New Column 16.8 13.7 12.9
Filtering 15.7 16.3 16.2
Sum 15.9 16.2 15.7
Average 16.4 37.4 16.5
Grouped Average 17.6 16.0 14.9
Join (10% of records) 27.8 5.6 5.1
Single Sort 91.2 32.6 30.2
Double Sort 95.4 32.6 34.8

Interestingly, we see that RDDs were rarely the more efficient option.  Even more surprisingly, Datasets were typically the most performant option and even when they weren't, they were very close.

So, it seems like we've found a nice trichotomy here.  RDDs are incredibly flexible and can perform virtually any analysis, but that comes at the cost of code complexity and (possibly) performance.  Data Frames can perform many of the analyses that we typically see in Data Engineering scenarios with the least code complexity and good performance.  Datasets perform slightly better than Data Frames with similar code, slightly bloated by the explicit casting.

We hope this mini-series on data objects was as education for you as it was for us.  This exercise has really opened our minds to what's possible in Azure Databricks.  Stay tuned for the next post, where we'll dig into Azure Databricks Delta Lake.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com