Monday, October 28, 2019

Azure Databricks: RDDs, Data Frames and Datasets, Part 3

Today, we're going to continue talking about RDDs, Data Frames and Datasets in Azure Databricks.  If you haven't read the previous posts in this series, IntroductionCluser CreationNotebooksDatabricks File System (DBFS)Hive (SQL) Database and RDDs, Data Frames and Dataset (Part 1, Part 2), they may provide some useful context.  If you are new to this material, it is especially important that you read through the previous posts (Part 1Part 2), as this post will build directly on what was discussed during the previous post.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, RDDs, Data Frames and Datasets.

Before we begin, the following code block will create all of the necessary objects from the previous posts (Part 1Part 2).

<CODE START>

import org.apache.spark.sql.functions._

val rdd_lines = sc.textFile("/databricks-datasets/adult/adult.data")

val rdd_adult = rdd_lines
  .filter(f => f != "")
  .map(l => l.split(", "))

val df_adult_nonames = sqlContext.read.csv("/databricks-datasets/adult/adult.data")

val df_adult_names = sqlContext.read
  .option("inferSchema", "true")
  .csv("/databricks-datasets/adult/adult.data").toDF(
    "age"
    ,"workclass"
    ,"fnlwgt"
    ,"education"
    ,"educationnum"
    ,"maritalstatus"
    ,"occupation"
    ,"relationship"
    ,"race"
    ,"sex"
    ,"capitalgain"
    ,"capitalloss"
    ,"hoursperweek"
    ,"nativecountry"
    ,"income"
  )

case class Adult (
  age: Int
  ,workclass: String
  ,fnlwgt: Double
  ,education: String
  ,educationnum: Double
  ,maritalstatus: String
  ,occupation: String
  ,relationship: String
  ,race: String
  ,sex: String
  ,capitalgain: Double
  ,capitalloss: Double
  ,hoursperweek: Double
  ,nativecountry: String
  ,income: String
)

val ds_adult = df_adult_names.as[Adult]

val rdd_select = rdd_adult
  .map(x => (x(0), x(14)))

val df_select = df_adult_names
  .select("age", "income")

case class Adult_select (
  age: Int
  ,income: String
)

val ds_select = ds_adult
  .select("age", "income")
  .as[Adult_select]

case class Adult_map (
  age: Int
  ,income: String
)

val ds_map = ds_adult
  .map(x => (x.age, x.income))
  .withColumnRenamed("_1", "age")
  .withColumnRenamed("_2", "income")
  .as[Adult_map]

val rdd_col = rdd_adult
  .map(x => (x(0), x(0).toInt * 12))

val df_col = df_adult_names
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")

case class Adult_col (
  age: Int
  ,ageinmonths: Int
)

val ds_col = ds_adult
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")
  .as[Adult_col]

val rdd_fil = rdd_adult
  .filter(x => x(14) == "<=50K")

val df_fil = df_adult_names
  .filter(ltrim($"income") === "<=50K")

val ds_fil = ds_adult
  .filter(ltrim($"income") === "<=50K")

val rdd_sum = rdd_adult
  .map(x => x(10).toDouble)
  .reduce((a, b) => a + b)

val df_sum = df_adult_names
  .agg(sum("capitalgain"))
  .first()
  .getDouble(0)

val ds_sum = ds_adult
  .agg(sum("capitalgain"))
  .first()
  .getDouble(0)

val rdd_avg_tup = rdd_adult
  .map(x => x(0).toInt)
  .aggregate((0, 0))(
    (w, age) => (w._1 + age, w._2 + 1)
    ,(a, b) => (a._1 + b._1, a._2 + b._2)
  )
val rdd_avg = rdd_avg_tup._1.toDouble / rdd_avg_tup._2

val df_avg = df_adult_names
  .agg(avg("age"))
  .first()
  .getDouble(0)

val ds_avg = ds_adult
  .agg(avg("age"))
  .first()
  .getDouble(0)

val ttuple = (1,2,3)
val tarray = Array(1,2,3)

val rdd_aapi = rdd_adult
  .map(x => (x(14), x(0).toInt))
  .aggregateByKey((0, 0))(
    (w, age) => (w._1 + age, w._2 + 1)
    ,(a, b) => (a._1 + b._1, a._2 + b._2)
  )
  .map(x => (x._1, x._2._1.toDouble / x._2._2))

val df_aapi = df_adult_names
  .groupBy("income")
  .agg(avg("age"))

case class Adult_aapi (
  income: String
  ,avgage: Double
)

val ds_aapi = ds_adult
  .groupBy("income")
  .agg(avg("age"))
  .withColumnRenamed("avg(age)", "avgage")
  .as[Adult_aapi]

<CODE END>

In the previous posts (Part 1Part 2), we've looked at a number of basic transformations and aggregations.  Now, let's take a look at joining objects together.  In the previous post, we created objects containing the average age per income level.  Let's try joining those into the original objects.

Join RDD Code

Join RDD Output

<CODE START>

val rdd_join_l = rdd_adult
  .map(x => (x(14), x))

val rdd_join_r = rdd_aapi
  .map(x => (x._1, x))

val rdd_join = rdd_join_l
  .join(rdd_join_r)
  .map(x => (
    x._2._1(0)
    ,x._2._1(1)
    ,x._2._1(2)
    ,x._2._1(3)
    ,x._2._1(4)
    ,x._2._1(5)
    ,x._2._1(6)
    ,x._2._1(7)
    ,x._2._1(8)
    ,x._2._1(9)
    ,x._2._1(10)
    ,x._2._1(11)
    ,x._2._1(12)
    ,x._2._1(13)
    ,x._2._1(14)
    ,x._2._2._2
  ))

rdd_join.take(2)

<CODE END>
Join Data Frame

<CODE START>

val df_join_r = df_aapi
  .withColumnRenamed("income", "rincome")

val df_join = df_adult_names
  .join(
    df_join_r
    ,df_adult_names("income") === df_join_r("rincome")
    ,"inner"
  )
  .drop("rincome")

df_join.show(2)

<CODE END>


Join Dataset Code

Join Dataset Output

<CODE START>


case class Adult_join_r (
  rincome: String
  ,avgage: Double
)

val ds_join_r = ds_aapi
  .withColumnRenamed("income", "rincome")
  .as[Adult_join_r]

case class Adult_join (
  age: Int
  ,workclass: String
  ,fnlwgt: Double
  ,education: String
  ,educationnum: Double
  ,maritalstatus: String
  ,occupation: String
  ,relationship: String
  ,race: String
  ,sex: String
  ,capitalgain: Double
  ,capitalloss: Double
  ,hoursperweek: Double
  ,nativecountry: String
  ,income: String
  ,avgage: Double
)

val ds_join = ds_adult
  .join(
    ds_join_r
    ,ds_adult("income") === ds_join_r("rincome")
    ,"inner"
  )
  .drop("rincome")
  .as[Adult_join]

ds_join.show(2)

<CODE END>

We can make use of the built-in .join() function for RDDs.  Similar to the .aggregateByKey() function we saw in the previous post, the .join() function for RDDs requires a 2-element tuple, with the first element being the key and the second element being the value.  So, we need to use the .map() function to restructure our RDDs to store the keys in the first element and the original array/tuple in the second element.  After the join, we end up with an awkward nested structure of arrays and tuples that we need to restructure using another .map() function, leading to a lengthy code snippet.

Data Frames and Datasets can also make use their own .join() function, with the ability to define a binary join condition using the === notation we saw in the previous post.  Since Data Frames and Datasets have column names, we need to rename the key column in the right Data Frame or Dataset using the .withColumnRenamed() function to avoid duplicate column names.  After that, we can drop the right key using the .drop() function.  Once again, we see that the primary difference when working with Datasets is that we need to explicitly create a case class for each new Dataset we create.

In this case, we wanted to perform an inner join.  However, we can also perform other join types using the leftOuterJoin(), rightOuterJoin() and fullOuterJoin() functions for RDDs.  Data Frames and Datasets have a bit more flexibility on this front, allowing us to pass in the following join types as strings: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti.

Next, let's try sorting our objects by age.
Sort RDD

<CODE START>

val rdd_sort = rdd_adult
  .sortBy(x => x(0))

rdd_sort.take(2)

<CODE END>


Sort Data Frame

<CODE START>

val df_sort = df_adult_names
  .orderBy("age")

df_sort.show(2)

<CODE END>


Sort Dataset

<CODE START>

val ds_sort = ds_adult
  .orderBy("age")

ds_sort.show(2)

<CODE END>

We were able to leverage the sortBy() function to sort the RDD.  All we needed to provide was an anonymous function that returns the values of interest.  The Data Frame and Dataset were similarly simple, using the .orderBy() function and a column name.  Since the structure of the Dataset didn't change, we didn't need create a new case class for it.

Although, sorting by age alone doesn't produce repeatable results, as age is not unique.  So, let's sort by age and workclass.
Sort RDD 2

<CODE START>

val rdd_sort2 = rdd_adult
  .sortBy(x => (x(0), x(1)))

rdd_sort.take(2)

<CODE END>


Sort Data Frame 2

<CODE START>

val df_sort2 = df_adult_names
  .orderBy("age", "workclass")

df_sort2.show(2)

<CODE END>


Sort Dataset 2

<CODE START>

val ds_sort2 = ds_adult
  .orderBy("age", "workclass")

ds_sort2.show(2)

<CODE END>

We see that this is equally simple.  For RDDs, we need to use a tuple instead of a single value.  For Data Frames and Datasets, we need to pass additional column into the orderBy() function.

That's all of the functionality that we wanted to cover in this mini-series comparing the data storage objects.  Using the tools that we've found over these three posts, the majority of data transformation problems are within our grasp.  We've seen that, for the most part, Data Frames and Datasets provide much simpler and more readable code because of their SQL-esque nature.  But, there's still one more topic we need to cover, Performance.  Stay tuned for the next post where we'll ramp up the data volume to see if there are any substantial performance impacts of using these objects.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com

Monday, October 7, 2019

Azure Databricks: RDDs, Data Frames and Datasets, Part 2

Today, we're going to continue talking about RDDs, Data Frames and Datasets in Azure Databricks.  If you haven't read the previous posts in this series, IntroductionCluser CreationNotebooksDatabricks File System (DBFS)Hive (SQL) Database and RDDs, Data Frames and Dataset (Part 1), they may provide some useful context.  If you are new to this material, it is especially important that you read through the previous post, as this post will build directly on what was discussed during the previous post.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, RDDs, Data Frames and Datasets.

Before we begin, the following code block will create all of the necessary objects from the previous post.

<CODE START>

val rdd_lines = sc.textFile("/databricks-datasets/adult/adult.data")

val rdd_adult = rdd_lines
  .filter(f => f != "")
  .map(l => l.split(", "))

val df_adult_nonames = sqlContext.read.csv("/databricks-datasets/adult/adult.data")

val df_adult_names = sqlContext.read
  .option("inferSchema", "true")
  .csv("/databricks-datasets/adult/adult.data").toDF(
    "age"
    ,"workclass"
    ,"fnlwgt"
    ,"education"
    ,"educationnum"
    ,"maritalstatus"
    ,"occupation"
    ,"relationship"
    ,"race"
    ,"sex"
    ,"capitalgain"
    ,"capitalloss"
    ,"hoursperweek"
    ,"nativecountry"
    ,"income"
  )

case class Adult (
  age: Int
  ,workclass: String
  ,fnlwgt: Double
  ,education: String
  ,educationnum: Double
  ,maritalstatus: String
  ,occupation: String
  ,relationship: String
  ,race: String
  ,sex: String
  ,capitalgain: Double
  ,capitalloss: Double
  ,hoursperweek: Double
  ,nativecountry: String
  ,income: String
)

val ds_adult = df_adult_names.as[Adult]

val rdd_select = rdd_adult
  .map(x => (x(0), x(14)))

val df_select = df_adult_names
  .select("age", "income")

case class Adult_select (
  age: Int
  ,income: String
)

val ds_select = ds_adult
  .select("age", "income")
  .as[Adult_select]

case class Adult_map (
  age: Int
  ,income: String
)

val ds_map = ds_adult
  .map(x => (x.age, x.income))
  .withColumnRenamed("_1", "age")
  .withColumnRenamed("_2", "income")
  .as[Adult_map]

val rdd_col = rdd_adult
  .map(x => (x(0), x(0).toInt * 12))

val df_col = df_adult_names
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")

case class Adult_col (
  age: Int
  ,ageinmonths: Int
)

val ds_col = ds_adult
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")

  .as[Adult_col]

<CODE END>

In the previous post, we looked at selecting and creating columns.  Now, let's take it one step further by filtering each object to only records with Income = "<=50k".

Filter RDD

<CODE START>

val rdd_fil = rdd_adult
  .filter(x => x(14) == "<=50K")

rdd_fil.count()

<CODE END>

Filter Data Frame

<CODE START>

import org.apache.spark.sql.functions._

val df_fil = df_adult_names
  .filter(ltrim($"income") === "<=50K")
df_fil.count()

<CODE END>

Filter Dataset

<CODE START>

import org.apache.spark.sql.functions._

val ds_fil = ds_adult
  .filter(ltrim($"income") === "<=50K")
df_fil.count()

<CODE END>

We see that there are some differences between filtering RDDs, Data Frames and Datasets.  The first major difference is the same one we keep seeing, RDDs reference by indices instead of column names.  There's also an interesting difference of using 2 ='s vs 3 ='s for equality operators. Simply put, "==" tries to directly equate two objects, whereas "===" tries to dynamically define what "equality" means.  In the case of filter(), it's typically used to determine whether the value in one column (income, in our case) is equal to the value of another column (string literal "<=50K", in our case).  In other words, if you want to compare values in one column to values in another column, "===" is the way to go.

Interestingly, there was another difference caused by the way we imported our data.  Since we custom-built our RDD parsing algorithm to use <COMMA><SPACE> as the delimiter, we don't need to trim our RDD values.  However, we used the built-in sqlContext.read.csv() function for the Data Frame and Dataset, which doesn't trim by default.  So, we used the ltrim() function to remove the leading whitespace.  This function can be imported from the org.apache.spark.sql.functions library.

As a nice added bonus, filter doesn't change the structure of the record.  So, we didn't need to create a new case class for the Dataset.  Data Frames and Datasets also have access to the where() function that is identical to filter().

Now that we've seen how easy it is to filter, let's try calculating the sum of the "capitalgain" column.
Sum RDD

<CODE START>

val rdd_sum = rdd_adult
  .map(x => x(10).toDouble)
  .reduce((t, c) => t + c)

rdd_sum

<CODE END>

Sum Data Frame

 <CODE START>

import org.apache.spark.sql.functions._

val df_sum = df_adult_names
  .agg(sum("capitalgain"))
  .first()
  .getDouble(0)

df_sum

<CODE END>


Sum Dataset


<CODE START>

import org.apache.spark.sql.functions._

val ds_sum = ds_adult
  .agg(sum("capitalgain"))
  .first()
  .getDouble(0)

ds_sum

<CODE END>

Here, we see another substantial difference between RDDs, Data Frames and Datasets.  RDDs make use the reduce() function.  Basically, we use the map() function to pass in only a single value for each element.  This is the capitalgain for each person.  Then, the reduce() function initializes two variables, a and b.  The reduce() function is going to "reduce" the capitalgains down into a single number.  However, it doesn't guarantee the order in which this happens.  So, a and b represent capitalgain values from the RDD and they will be added to each other until we arrive at a single, final number, the sum.  Since we can't control the order with which the values are reduced, the reduce() function requires our operation to be commutative and associative, which is the case for addition.

We see that Data Frames and Datasets make this substantially easier via the use of the agg() function.  This function allows us to define aggregations against Data Frames or Datasets.  The complete list of aggregations that can be passed into the agg() function can be found here.  The agg() function outputs a 1-by-1 Data Frame, meaning that we need to retrieve the value using the first() and getDouble() functions.

Next, let's move on to a more complex aggregation, average age.
Average RDD

<CODE START>

val rdd_avg_tup = rdd_adult
  .map(x => x(0).toInt)
  .aggregate((0, 0))(
    (w, age) => (w._1 + age, w._2 + 1)
    ,(a, b) => (a._1 + b._1, a._2 + b._2)
  )
val rdd_avg = rdd_avg_tup._1.toDouble / rdd_avg_tup._2

rdd_avg

<CODE END>


Average Data Frame

<CODE START>

import org.apache.spark.sql.functions._

val df_avg = df_adult_names
  .agg(avg("age"))
  .first()
  .getDouble(0)

df_avg

<CODE END>

Average Dataset

<CODE START>

import org.apache.spark.sql.functions._

val ds_avg = ds_adult
  .agg(avg("age"))
  .first()
  .getDouble(0)

ds_avg

<CODE END>

We see the complexity of RDD operations ramping up even further.  The reduce() function is fantastic for performing a single, basic operation that is commutative and associative.  "Average" does not fall in this category.  Instead, it's a non-commutative, non-associative combination (division, quotient) of two commutative, associative operations, sum and count.  So, we need to use the more complex and flexible aggregate() function.

The first part of the aggregate function is the "zero value", i.e. the "starting value" for the aggregation.  In our case, we use the tuple (0, 0) to store the starting sum and starting count.  Then, we need to define the "within-partition" calculation.

This calculation will take in two variables, w and age.  It pulls "w", the starting value, from the first part of the aggregate() function and it pulls "age" from the RDD passed in by the previous map() function.  Using "w" and "age", it creates a tuple for each partition in the RDD.  The first element of the tuple is the sum, calculating by adding all of the "age" values, and the second element is the count, calculating by adding 1 for each "age" value.

Once the (sum, count) tuple is calculated for each partition, the final calculation functions identically to the way that reduce() does.  It pulls the (sum, count) tuples from the previous step and combines them together in whatever order it feels like.  To turn a set of sums and counts into an overall sum and and an overall count, all we need to do is add all of the sums together and add all the counts together.

Finally, we calculate the average by dividing the overall sum by the overall count.  We also have to cast the sum to a double beforehand because, as is true in many programming languages, Int / Int = Int in Scala, which would give us an answer of 38, instead of 38.581...

To add even more complexity, the syntax for pulling elements out of a Tuple is tuple._1, where index starts at 1, and the syntax for pulling elements out of an Array is array(index), where index starts at 0.
Tuple

<CODE START>

val ttuple = (1,2,3)

ttuple._1

<CODE END>


Array

<CODE START>

val tarray = Array(1,2,3)

tarray(0)

<CODE END>

On a completely separate, but similarly complex topic, we also haven't discussed partitions at this point.  Simply put, Spark is a distributed processing framework and many objects that we use are split across partitions to allow for parallel processing.  RDDs, Data Frames and Datasets are some of these objects.  Managing partitions is a more advanced topic that we may discuss in a later post.

Moving on, we see that Data Frames and Datasets make this operation much easier by providing a built-in avg() function.  For our final transformation in this post, let's calculate average age per income level.
Average Age by Income RDD

<CODE START>

val rdd_aapi = rdd_adult
  .map(x => (x(14), x(0).toInt))
  .aggregateByKey((0, 0))(
    (w, age) => (w._1 + age, w._2 + 1)
    ,(a, b) => (a._1 + b._1, a._2 + b._2)
  )
  .map(x => (x._1, x._2._1.toDouble / x._2._2))

rdd_aapi.collect()

<CODE END>


Average Age by Income Data Frame

<CODE START>

import org.apache.spark.sql.functions._

val df_aapi = df_adult_names
  .groupBy("income")
  .agg(avg("age"))

df_aapi.show()

<CODE END>

Average Age by Income Dataset

<CODE START>

import org.apache.spark.sql.functions._

case class Adult_aapi (
  income: String
  ,avgage: Double
)

val ds_aapi = ds_adult
  .groupBy("income")
  .agg(avg("age"))
  .withColumnRenamed("avg(age)", "avgage")
  .as[Adult_aapi]

ds_aapi.show()

<CODE END>

The RDD code is almost identical to the previous example.  However, there are a couple of differences.  First, we are creating a 2-element tuple in the first map() function, with the first element being the key, "income" in this case, and the second element being the value, "age" in this case.  Then, we use the aggregateByKey() function to perform the exact same aggregations as before.  This time however, it will aggregate the values by implicitly pulling them from the second element of the tuples passed in from the previous map() function.  It's important to note that we don't assign keys or values at any point here.  They are assumed to the be the first and second elements of the tuples, respectively.

Next, since this aggregateByKey() function is assumed to output more than one element, it passes an RDD instead of a tuple.  So, we can perform our final division by using another map() function, returning the income and average age values as an RDD of tuples.

Again, the Data Frame and Dataset code is drastically simpler, relying heavily on the groupBy() and agg() functions to do the heavy lifting.  Also, since we are now dealing with more than one record, we have to explicitly cast our Dataset again.  This also requires a column rename because parenthesis are not allowed in Dataset column names.

Hopefully this post shed a little more light on the data transformation methods in Azure Databricks.  We're starting to notice a trend of RDD operations being much complex than their Data Frame and Dataset counterparts.  Does this trend continue with other types of transformations?  Does it buy us any potential performance improvements?  Stay tuned for the next post where we'll dig even deeper into these questions.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com