Before we begin, the following code block will create all of the necessary objects from the previous posts (Part 1, Part 2).
import org.apache.spark.sql.functions._
val rdd_lines = sc.textFile("/databricks-datasets/adult/")
val rdd_adult = rdd_lines
.filter(f => f != "")
.map(l => l.split(", "))
val df_adult_nonames ="/databricks-datasets/adult/")
val df_adult_names =
.option("inferSchema", "true")
case class Adult (
age: Int
,workclass: String
,fnlwgt: Double
,education: String
,educationnum: Double
,maritalstatus: String
,occupation: String
,relationship: String
,race: String
,sex: String
,capitalgain: Double
,capitalloss: Double
,hoursperweek: Double
,nativecountry: String
,income: String
val ds_adult =[Adult]
val rdd_select = rdd_adult
.map(x => (x(0), x(14)))
val df_select = df_adult_names
.select("age", "income")
case class Adult_select (
age: Int
,income: String
val ds_select = ds_adult
.select("age", "income")
case class Adult_map (
age: Int
,income: String
val ds_map = ds_adult
.map(x => (x.age, x.income))
.withColumnRenamed("_1", "age")
.withColumnRenamed("_2", "income")
val rdd_col = rdd_adult
.map(x => (x(0), x(0).toInt * 12))
val df_col = df_adult_names
.withColumn("ageinmonths", $"age" * 12)
.select("age", "ageinmonths")
case class Adult_col (
age: Int
,ageinmonths: Int
val ds_col = ds_adult
.withColumn("ageinmonths", $"age" * 12)
.select("age", "ageinmonths")
val rdd_fil = rdd_adult
.filter(x => x(14) == "<=50K")
val df_fil = df_adult_names
.filter(ltrim($"income") === "<=50K")
val ds_fil = ds_adult
.filter(ltrim($"income") === "<=50K")
val rdd_sum = rdd_adult
.map(x => x(10).toDouble)
.reduce((a, b) => a + b)
val df_sum = df_adult_names
val ds_sum = ds_adult
val rdd_avg_tup = rdd_adult
.map(x => x(0).toInt)
.aggregate((0, 0))(
(w, age) => (w._1 + age, w._2 + 1)
,(a, b) => (a._1 + b._1, a._2 + b._2)
val rdd_avg = rdd_avg_tup._1.toDouble / rdd_avg_tup._2
val df_avg = df_adult_names
val ds_avg = ds_adult
val ttuple = (1,2,3)
val tarray = Array(1,2,3)
val rdd_aapi = rdd_adult
.map(x => (x(14), x(0).toInt))
.aggregateByKey((0, 0))(
(w, age) => (w._1 + age, w._2 + 1)
,(a, b) => (a._1 + b._1, a._2 + b._2)
.map(x => (x._1, x._2._1.toDouble / x._2._2))
val df_aapi = df_adult_names
case class Adult_aapi (
income: String
,avgage: Double
val ds_aapi = ds_adult
.withColumnRenamed("avg(age)", "avgage")
In the previous posts (Part 1, Part 2), we've looked at a number of basic transformations and aggregations. Now, let's take a look at joining objects together. In the previous post, we created objects containing the average age per income level. Let's try joining those into the original objects.
![]() |
Join RDD Code |
![]() |
Join RDD Output |
val rdd_join_l = rdd_adult
.map(x => (x(14), x))
val rdd_join_r = rdd_aapi
.map(x => (x._1, x))
val rdd_join = rdd_join_l
.map(x => (
![]() |
Join Data Frame |
val df_join_r = df_aapi
.withColumnRenamed("income", "rincome")
val df_join = df_adult_names
,df_adult_names("income") === df_join_r("rincome")
![]() |
Join Dataset Code |
![]() |
Join Dataset Output |
case class Adult_join_r (
rincome: String
,avgage: Double
val ds_join_r = ds_aapi
.withColumnRenamed("income", "rincome")
case class Adult_join (
age: Int
,workclass: String
,fnlwgt: Double
,education: String
,educationnum: Double
,maritalstatus: String
,occupation: String
,relationship: String
,race: String
,sex: String
,capitalgain: Double
,capitalloss: Double
,hoursperweek: Double
,nativecountry: String
,income: String
,avgage: Double
val ds_join = ds_adult
,ds_adult("income") === ds_join_r("rincome")
We can make use of the built-in .join() function for RDDs. Similar to the .aggregateByKey() function we saw in the previous post, the .join() function for RDDs requires a 2-element tuple, with the first element being the key and the second element being the value. So, we need to use the .map() function to restructure our RDDs to store the keys in the first element and the original array/tuple in the second element. After the join, we end up with an awkward nested structure of arrays and tuples that we need to restructure using another .map() function, leading to a lengthy code snippet.
Data Frames and Datasets can also make use their own .join() function, with the ability to define a binary join condition using the === notation we saw in the previous post. Since Data Frames and Datasets have column names, we need to rename the key column in the right Data Frame or Dataset using the .withColumnRenamed() function to avoid duplicate column names. After that, we can drop the right key using the .drop() function. Once again, we see that the primary difference when working with Datasets is that we need to explicitly create a case class for each new Dataset we create.
In this case, we wanted to perform an inner join. However, we can also perform other join types using the leftOuterJoin(), rightOuterJoin() and fullOuterJoin() functions for RDDs. Data Frames and Datasets have a bit more flexibility on this front, allowing us to pass in the following join types as strings: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti.
Next, let's try sorting our objects by age.
![]() |
Sort RDD |
val rdd_sort = rdd_adult
.sortBy(x => x(0))
![]() |
Sort Data Frame |
val df_sort = df_adult_names
![]() |
Sort Dataset |
val ds_sort = ds_adult
We were able to leverage the sortBy() function to sort the RDD. All we needed to provide was an anonymous function that returns the values of interest. The Data Frame and Dataset were similarly simple, using the .orderBy() function and a column name. Since the structure of the Dataset didn't change, we didn't need create a new case class for it.
Although, sorting by age alone doesn't produce repeatable results, as age is not unique. So, let's sort by age and workclass.
![]() |
Sort RDD 2 |
val rdd_sort2 = rdd_adult
.sortBy(x => (x(0), x(1)))
![]() |
Sort Data Frame 2 |
val df_sort2 = df_adult_names
.orderBy("age", "workclass")
![]() |
Sort Dataset 2 |
val ds_sort2 = ds_adult
.orderBy("age", "workclass")
We see that this is equally simple. For RDDs, we need to use a tuple instead of a single value. For Data Frames and Datasets, we need to pass additional column into the orderBy() function.
That's all of the functionality that we wanted to cover in this mini-series comparing the data storage objects. Using the tools that we've found over these three posts, the majority of data transformation problems are within our grasp. We've seen that, for the most part, Data Frames and Datasets provide much simpler and more readable code because of their SQL-esque nature. But, there's still one more topic we need to cover, Performance. Stay tuned for the next post where we'll ramp up the data volume to see if there are any substantial performance impacts of using these objects. Thanks for reading. We hope you found this informative.
Brad Llewellyn
Service Engineer - FastTrack for Azure