Monday, September 16, 2019

Azure Databricks: RDDs, Data Frames and Datasets, Part 1

Today, we're going to talk about RDDs, Data Frames and Datasets in Azure Databricks.  If you haven't read the previous posts in this series, IntroductionCluster CreationNotebooksDatabricks File System (DBFS) and Hive (SQL) Database, they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, RDDs, Data Frames and Datasets.

It could be argued that the most important component of any Data Analysis is the component that contains the data.  In Spark, there are a number of different data containers, primarily RDDs, Data Frames and Datasets.  So, what are these containers and when should we use them?

Resilient Distributed Dataset, aka RDD, is "a fault-tolerant collection of elements that can be operated on in parallel."  In layman's terms, an RDD is a collection of "things", nothing more.  These "things" can take many forms, including sentences from a book, records from a database or machine data represented in binary.  To help us visualize, here's a basic RDD created from a CSV file.  For reasons we'll touch on later in this post, we'll be using Scala for this post.
Lines RDD

<CODE START>

val rdd_lines = sc.textFile("/databricks-datasets/adult/adult.data")
rdd_lines.take(2)

<CODE END>

We see that creating an RDD can be done with one easy function.  In this snippet, sc represents the default SparkContext.  This is extremely important, but is better left for a later post.  SparkContext offers the .textFile() function which creates an RDD from a text file, parsing each line into it's own element in the RDD.  These lines happen to represent CSV records.  However, there are many other common examples that use lines of free text.

It should also be noted that we can use the .collect() and .take() functions to view the contents of an RDD.  The difference between .collect() and .take() is that .take() allows us to specify the number of elements we want to retrieve, whereas .collect() returns the entire RDD.

There's also an interesting conflict in the way that Scala displays arrays.  Scala displays each element of the array, separating the elements with commas.  However, it doesn't display quotes around string values.  In our case, since our elements are actually CSV records, it ends up looking like each value is an individual element.  This obviously isn't the case because we used .take(2) to display them.  We added the red line in the picture to show where the elements split.

While we could use this RDD for the rest of this post, it will probably make more sense later if we split the data along the commas to create something that more closely resembles a real-world dataset.
Adult RDD

<CODE START>

val rdd_adult = rdd_lines
  .filter(f => f != "")
  .map(l => l.split(", "))
rdd_adult.take(2)

<CODE END>

The code to transform the raw lines of CSV text into their own arrays is a little more complex and deserves its own post.  Here are some good links for getting started with Transformations and Actions.  Fortunately, splitting the elements into their own sub-arrays also fixed the awkward display issue.  Next, let's look at Data Frames.
Unnamed and Untyped Data Frame

<CODE START>

val df_adult_nonames = sqlContext.read.csv("/databricks-datasets/adult/adult.data")
df_adult_nonames.show(2)

<CODE END>

We see that creating a Data Frame from a CSV is a good deal simpler than the RDD was.  The ability to use the .read.csv() function abstracts away what we had to do manually before.  A major difference is that we create Data Frames using a SQLContext, which is a layer on top of the SparkContext that allows us to perform more SQL-like operations.  Again, contexts are very important and deserve their own post.

Storing the data as a Data Frame gives us the substantial advantage of storing each individual data point in a named column.  This allows us to easily manipulate individuals columns.  We'll see more on this later.  The previous screenshot showed that we can create Data Frames without supplying any column information at all.  However, we also have the ability to supply the column names so we don't have to use the default ones.
Named and Typed Data Frame

<CODE START>

val df_adult_names = sqlContext.read
  .option("inferSchema", "true")
  .csv("/databricks-datasets/adult/adult.data").toDF(
    "age"
    ,"workclass"
    ,"fnlwgt"
    ,"education"
    ,"educationnum"
    ,"maritalstatus"
    ,"occupation"
    ,"relationship"
    ,"race"
    ,"sex"
    ,"capitalgain"
    ,"capitalloss"
    ,"hoursperweek"
    ,"nativecountry"
    ,"income"
  )
df_adult.show(2)

<CODE END>

We see that the .option() and .toDF() functions allow us to infer the schema and pass the column names easily.  Another difference between RDDs and Data Frames is what we display Data Frames using the .show() function instead .take().  Interestingly, the .collect() and .take() functions can be used to turn Data Frames back into RDDs.  Data Frames and Datasets also have a .rdd attribute that accomplishes this.
Take from Data Frame

<CODE START>

df_adult_names.take(2)

<CODE END>

Speaking of Datasets, let's take a look at one.  Basically, Data Frames are a special type of Dataset.  Technically speaking, a Data Frame is an UNTYPED Dataset of Rows.  Basically, a Row is designed to closely resembles what a record from a relational table would look like.  A Row contains a fixed number of elements that represent individual objects.  We can also use STRONGLY-TYPED Datasets of custom classes, other than Rows.  For instance, we can create a Dataset out of our named and typed Data Frame by explicitly creating a case class for it.
Dataset

<CODE START>

case class Adult (
  age: Int
  ,workclass: String
  ,fnlwgt: Double
  ,education: String
  ,educationnum: Double
  ,maritalstatus: String
  ,occupation: String
  ,relationship: String
  ,race: String
  ,sex: String
  ,capitalgain: Double
  ,capitalloss: Double
  ,hoursperweek: Double
  ,nativecountry: String
  ,income: String
)

val ds_adult = df_adult_names.as[Adult]

ds_adult.show(2)

<CODE END>

Having a strongly-typed Dataset does offer a few advantages from a deep technical perspective.  Simply put, if we can be certain that an object is of a certain type, then we can build more robust applications.  Interestingly, a major difference between Data Frames and Datasets is that Datasets can be reverted back to the original objects, whereas Data Frames cannot.  For instance, assume that we have an RDD of objects.  If we cast this RDD to a Dataset and back to an RDD, then we will have our original RDD back.  However, if we cast this RDD to a Data Frame and back to an RDD, then we will have a new RDD of Rows because the Data Frame can only hold Rows.

Now, let's try a few basic data manipulation operations.  First, let's select two "columns" from our data, age and income.
Select RDD

<CODE START>

val rdd_select = rdd_adult
  .map(x => (x(0), x(14)))

rdd_select.take(2)

<CODE END>


Select Data Frame

<CODE START>

val df_select = df_adult_names
  .select("age", "income")

df_select.show(2)

<CODE END>

Select Dataset

<CODE START>

val ds_select = df_adult_names
  .select("age", "income")
ds_select.show(2)

<CODE END>

We see that RDDs require the awkward .map() function where we build an anonymous function and reference the "columns" (in RDD terms, they are actually elements within a tuple) by their indices.  Data Frames and Datasets, on the other hand, allow us to use the much more natural .select() function and reference the columns by their names.  However, if we look very closely at the output for the Dataset command, we see that the .select() functions returned a Data Frame, not a Dataset.  We need to remember that Datasets are simply strongly-typed Data Frames.  Therefore, we can cast this into a Dataset again by creating a new case class for the new object that we are creating.
Select Dataset (Typed)

<CODE START>

case class Adult_select (
  age: Int
  ,income: String
)

val ds_select = ds_adult
  .select("age", "income")
  .as[Adult_select]

ds_select.show(2)

<CODE END>

Obviously, this trend will repeat itself for every step, so we won't mention it again.  Interestingly, it's also possible to operate on Datasets (but NOT Data Frames) using the .map() function.

Select Dataset (map)

<CODE START>

case class Adult_map (
  age: Int
  ,income: String
)

val ds_map = ds_adult
  .map(x => (x.age, x.income))
  .withColumnRenamed("_1", "age")
  .withColumnRenamed("_2", "income")
  .as[Adult_map]

ds_map.show(2)

<CODE END>

This is another advantage of Datasets.  However, using the .map() function with an anonymous function removes the column names, resulting in even more code to add them back.  All in all, the .select() approach is much simpler.  Next, let's take this one step further and create a new column, ageinmonths.
New Column RDD

<CODE START>

val rdd_col = rdd_adult
  .map(x => (x(0), x(0).toInt * 12))

rdd_col.take(2)

<CODE END>
New Column Data Frame

<CODE START>

val df_col = df_adult_names
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")

df_col.show(2)

<CODE END>


New Column Dataset

<CODE START>

case class Adult_col (
  age: Int
  ,ageinmonths: Int
)

val ds_col = ds_adult
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")
  .as[Adult_col]

ds_col.show(2)

<CODE END>

We see that the RDD code leverages the .map() function again, while the Data Frame and Dataset code leverage the .withColumn() and .select() functions.  Again, we see that the Data Frame and Dataset code sacrifices compactness for readability.  We've heard that there's also a performance difference as well.  However, given the simplistic operations and small dataset, we were not able to verify this.  We'll dig into this in a later post.

To back up to a comment at the beginning of this post, why did we choose to use Scala to showcase this functionality instead of Python, a language that we are much more comfortable with and is honestly much more readable?  Per the official Spark documentation,
Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.
While this isn't a post about the differences between Spark programming languages, we do see an interesting dichotomy arising.  Simply put, Java and Scala are great languages for developing robust, programmer-friendly applications, while Python and R are great languages for creating rich, analyst-friendly analyses.  In fact, it's quite common for analysts to use PySpark or SparkR to munge through large amounts of data, then pull a much smaller portion of that data into more manageable objects, like Pandas or R Data Frames for deeper analysis or visualizations.  This allows them to leverage the power of Spark when it's needed and keep it simpler when it isn't.

This seems like a great place to stop for today.  We hope that this post open your eyes a little to the differences between RDDs, Data Frames and Datasets, as well as how you can leverage them in your analyses.  Stay tuned for the next post where we'll dig deeper and create even more complex transformations.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com

No comments:

Post a Comment