Monday, October 7, 2019

Azure Databricks: RDDs, Data Frames and Datasets, Part 2

Today, we're going to continue talking about RDDs, Data Frames and Datasets in Azure Databricks.  If you haven't read the previous posts in this series, IntroductionCluser CreationNotebooksDatabricks File System (DBFS)Hive (SQL) Database and RDDs, Data Frames and Dataset (Part 1), they may provide some useful context.  If you are new to this material, it is especially important that you read through the previous post, as this post will build directly on what was discussed during the previous post.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, RDDs, Data Frames and Datasets.

Before we begin, the following code block will create all of the necessary objects from the previous post.

<CODE START>

val rdd_lines = sc.textFile("/databricks-datasets/adult/adult.data")

val rdd_adult = rdd_lines
  .filter(f => f != "")
  .map(l => l.split(", "))

val df_adult_nonames = sqlContext.read.csv("/databricks-datasets/adult/adult.data")

val df_adult_names = sqlContext.read
  .option("inferSchema", "true")
  .csv("/databricks-datasets/adult/adult.data").toDF(
    "age"
    ,"workclass"
    ,"fnlwgt"
    ,"education"
    ,"educationnum"
    ,"maritalstatus"
    ,"occupation"
    ,"relationship"
    ,"race"
    ,"sex"
    ,"capitalgain"
    ,"capitalloss"
    ,"hoursperweek"
    ,"nativecountry"
    ,"income"
  )

case class Adult (
  age: Int
  ,workclass: String
  ,fnlwgt: Double
  ,education: String
  ,educationnum: Double
  ,maritalstatus: String
  ,occupation: String
  ,relationship: String
  ,race: String
  ,sex: String
  ,capitalgain: Double
  ,capitalloss: Double
  ,hoursperweek: Double
  ,nativecountry: String
  ,income: String
)

val ds_adult = df_adult_names.as[Adult]

val rdd_select = rdd_adult
  .map(x => (x(0), x(14)))

val df_select = df_adult_names
  .select("age", "income")

case class Adult_select (
  age: Int
  ,income: String
)

val ds_select = ds_adult
  .select("age", "income")
  .as[Adult_select]

case class Adult_map (
  age: Int
  ,income: String
)

val ds_map = ds_adult
  .map(x => (x.age, x.income))
  .withColumnRenamed("_1", "age")
  .withColumnRenamed("_2", "income")
  .as[Adult_map]

val rdd_col = rdd_adult
  .map(x => (x(0), x(0).toInt * 12))

val df_col = df_adult_names
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")

case class Adult_col (
  age: Int
  ,ageinmonths: Int
)

val ds_col = ds_adult
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")

  .as[Adult_col]

<CODE END>

In the previous post, we looked at selecting and creating columns.  Now, let's take it one step further by filtering each object to only records with Income = "<=50k".

Filter RDD

<CODE START>

val rdd_fil = rdd_adult
  .filter(x => x(14) == "<=50K")

rdd_fil.count()

<CODE END>

Filter Data Frame

<CODE START>

import org.apache.spark.sql.functions._

val df_fil = df_adult_names
  .filter(ltrim($"income") === "<=50K")
df_fil.count()

<CODE END>

Filter Dataset

<CODE START>

import org.apache.spark.sql.functions._

val ds_fil = ds_adult
  .filter(ltrim($"income") === "<=50K")
df_fil.count()

<CODE END>

We see that there are some differences between filtering RDDs, Data Frames and Datasets.  The first major difference is the same one we keep seeing, RDDs reference by indices instead of column names.  There's also an interesting difference of using 2 ='s vs 3 ='s for equality operators. Simply put, "==" tries to directly equate two objects, whereas "===" tries to dynamically define what "equality" means.  In the case of filter(), it's typically used to determine whether the value in one column (income, in our case) is equal to the value of another column (string literal "<=50K", in our case).  In other words, if you want to compare values in one column to values in another column, "===" is the way to go.

Interestingly, there was another difference caused by the way we imported our data.  Since we custom-built our RDD parsing algorithm to use <COMMA><SPACE> as the delimiter, we don't need to trim our RDD values.  However, we used the built-in sqlContext.read.csv() function for the Data Frame and Dataset, which doesn't trim by default.  So, we used the ltrim() function to remove the leading whitespace.  This function can be imported from the org.apache.spark.sql.functions library.

As a nice added bonus, filter doesn't change the structure of the record.  So, we didn't need to create a new case class for the Dataset.  Data Frames and Datasets also have access to the where() function that is identical to filter().

Now that we've seen how easy it is to filter, let's try calculating the sum of the "capitalgain" column.
Sum RDD

<CODE START>

val rdd_sum = rdd_adult
  .map(x => x(10).toDouble)
  .reduce((t, c) => t + c)

rdd_sum

<CODE END>

Sum Data Frame

 <CODE START>

import org.apache.spark.sql.functions._

val df_sum = df_adult_names
  .agg(sum("capitalgain"))
  .first()
  .getDouble(0)

df_sum

<CODE END>


Sum Dataset


<CODE START>

import org.apache.spark.sql.functions._

val ds_sum = ds_adult
  .agg(sum("capitalgain"))
  .first()
  .getDouble(0)

ds_sum

<CODE END>

Here, we see another substantial difference between RDDs, Data Frames and Datasets.  RDDs make use the reduce() function.  Basically, we use the map() function to pass in only a single value for each element.  This is the capitalgain for each person.  Then, the reduce() function initializes two variables, a and b.  The reduce() function is going to "reduce" the capitalgains down into a single number.  However, it doesn't guarantee the order in which this happens.  So, a and b represent capitalgain values from the RDD and they will be added to each other until we arrive at a single, final number, the sum.  Since we can't control the order with which the values are reduced, the reduce() function requires our operation to be commutative and associative, which is the case for addition.

We see that Data Frames and Datasets make this substantially easier via the use of the agg() function.  This function allows us to define aggregations against Data Frames or Datasets.  The complete list of aggregations that can be passed into the agg() function can be found here.  The agg() function outputs a 1-by-1 Data Frame, meaning that we need to retrieve the value using the first() and getDouble() functions.

Next, let's move on to a more complex aggregation, average age.
Average RDD

<CODE START>

val rdd_avg_tup = rdd_adult
  .map(x => x(0).toInt)
  .aggregate((0, 0))(
    (w, age) => (w._1 + age, w._2 + 1)
    ,(a, b) => (a._1 + b._1, a._2 + b._2)
  )
val rdd_avg = rdd_avg_tup._1.toDouble / rdd_avg_tup._2

rdd_avg

<CODE END>


Average Data Frame

<CODE START>

import org.apache.spark.sql.functions._

val df_avg = df_adult_names
  .agg(avg("age"))
  .first()
  .getDouble(0)

df_avg

<CODE END>

Average Dataset

<CODE START>

import org.apache.spark.sql.functions._

val ds_avg = ds_adult
  .agg(avg("age"))
  .first()
  .getDouble(0)

ds_avg

<CODE END>

We see the complexity of RDD operations ramping up even further.  The reduce() function is fantastic for performing a single, basic operation that is commutative and associative.  "Average" does not fall in this category.  Instead, it's a non-commutative, non-associative combination (division, quotient) of two commutative, associative operations, sum and count.  So, we need to use the more complex and flexible aggregate() function.

The first part of the aggregate function is the "zero value", i.e. the "starting value" for the aggregation.  In our case, we use the tuple (0, 0) to store the starting sum and starting count.  Then, we need to define the "within-partition" calculation.

This calculation will take in two variables, w and age.  It pulls "w", the starting value, from the first part of the aggregate() function and it pulls "age" from the RDD passed in by the previous map() function.  Using "w" and "age", it creates a tuple for each partition in the RDD.  The first element of the tuple is the sum, calculating by adding all of the "age" values, and the second element is the count, calculating by adding 1 for each "age" value.

Once the (sum, count) tuple is calculated for each partition, the final calculation functions identically to the way that reduce() does.  It pulls the (sum, count) tuples from the previous step and combines them together in whatever order it feels like.  To turn a set of sums and counts into an overall sum and and an overall count, all we need to do is add all of the sums together and add all the counts together.

Finally, we calculate the average by dividing the overall sum by the overall count.  We also have to cast the sum to a double beforehand because, as is true in many programming languages, Int / Int = Int in Scala, which would give us an answer of 38, instead of 38.581...

To add even more complexity, the syntax for pulling elements out of a Tuple is tuple._1, where index starts at 1, and the syntax for pulling elements out of an Array is array(index), where index starts at 0.
Tuple

<CODE START>

val ttuple = (1,2,3)

ttuple._1

<CODE END>


Array

<CODE START>

val tarray = Array(1,2,3)

tarray(0)

<CODE END>

On a completely separate, but similarly complex topic, we also haven't discussed partitions at this point.  Simply put, Spark is a distributed processing framework and many objects that we use are split across partitions to allow for parallel processing.  RDDs, Data Frames and Datasets are some of these objects.  Managing partitions is a more advanced topic that we may discuss in a later post.

Moving on, we see that Data Frames and Datasets make this operation much easier by providing a built-in avg() function.  For our final transformation in this post, let's calculate average age per income level.
Average Age by Income RDD

<CODE START>

val rdd_aapi = rdd_adult
  .map(x => (x(14), x(0).toInt))
  .aggregateByKey((0, 0))(
    (w, age) => (w._1 + age, w._2 + 1)
    ,(a, b) => (a._1 + b._1, a._2 + b._2)
  )
  .map(x => (x._1, x._2._1.toDouble / x._2._2))

rdd_aapi.collect()

<CODE END>


Average Age by Income Data Frame

<CODE START>

import org.apache.spark.sql.functions._

val df_aapi = df_adult_names
  .groupBy("income")
  .agg(avg("age"))

df_aapi.show()

<CODE END>

Average Age by Income Dataset

<CODE START>

import org.apache.spark.sql.functions._

case class Adult_aapi (
  income: String
  ,avgage: Double
)

val ds_aapi = ds_adult
  .groupBy("income")
  .agg(avg("age"))
  .withColumnRenamed("avg(age)", "avgage")
  .as[Adult_aapi]

ds_aapi.show()

<CODE END>

The RDD code is almost identical to the previous example.  However, there are a couple of differences.  First, we are creating a 2-element tuple in the first map() function, with the first element being the key, "income" in this case, and the second element being the value, "age" in this case.  Then, we use the aggregateByKey() function to perform the exact same aggregations as before.  This time however, it will aggregate the values by implicitly pulling them from the second element of the tuples passed in from the previous map() function.  It's important to note that we don't assign keys or values at any point here.  They are assumed to the be the first and second elements of the tuples, respectively.

Next, since this aggregateByKey() function is assumed to output more than one element, it passes an RDD instead of a tuple.  So, we can perform our final division by using another map() function, returning the income and average age values as an RDD of tuples.

Again, the Data Frame and Dataset code is drastically simpler, relying heavily on the groupBy() and agg() functions to do the heavy lifting.  Also, since we are now dealing with more than one record, we have to explicitly cast our Dataset again.  This also requires a column rename because parenthesis are not allowed in Dataset column names.

Hopefully this post shed a little more light on the data transformation methods in Azure Databricks.  We're starting to notice a trend of RDD operations being much complex than their Data Frame and Dataset counterparts.  Does this trend continue with other types of transformations?  Does it buy us any potential performance improvements?  Stay tuned for the next post where we'll dig even deeper into these questions.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com

Monday, September 16, 2019

Azure Databricks: RDDs, Data Frames and Datasets, Part 1

Today, we're going to talk about RDDs, Data Frames and Datasets in Azure Databricks.  If you haven't read the previous posts in this series, IntroductionCluster CreationNotebooksDatabricks File System (DBFS) and Hive (SQL) Database, they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, RDDs, Data Frames and Datasets.

It could be argued that the most important component of any Data Analysis is the component that contains the data.  In Spark, there are a number of different data containers, primarily RDDs, Data Frames and Datasets.  So, what are these containers and when should we use them?

Resilient Distributed Dataset, aka RDD, is "a fault-tolerant collection of elements that can be operated on in parallel."  In layman's terms, an RDD is a collection of "things", nothing more.  These "things" can take many forms, including sentences from a book, records from a database or machine data represented in binary.  To help us visualize, here's a basic RDD created from a CSV file.  For reasons we'll touch on later in this post, we'll be using Scala for this post.
Lines RDD

<CODE START>

val rdd_lines = sc.textFile("/databricks-datasets/adult/adult.data")
rdd_lines.take(2)

<CODE END>

We see that creating an RDD can be done with one easy function.  In this snippet, sc represents the default SparkContext.  This is extremely important, but is better left for a later post.  SparkContext offers the .textFile() function which creates an RDD from a text file, parsing each line into it's own element in the RDD.  These lines happen to represent CSV records.  However, there are many other common examples that use lines of free text.

It should also be noted that we can use the .collect() and .take() functions to view the contents of an RDD.  The difference between .collect() and .take() is that .take() allows us to specify the number of elements we want to retrieve, whereas .collect() returns the entire RDD.

There's also an interesting conflict in the way that Scala displays arrays.  Scala displays each element of the array, separating the elements with commas.  However, it doesn't display quotes around string values.  In our case, since our elements are actually CSV records, it ends up looking like each value is an individual element.  This obviously isn't the case because we used .take(2) to display them.  We added the red line in the picture to show where the elements split.

While we could use this RDD for the rest of this post, it will probably make more sense later if we split the data along the commas to create something that more closely resembles a real-world dataset.
Adult RDD

<CODE START>

val rdd_adult = rdd_lines
  .filter(f => f != "")
  .map(l => l.split(", "))
rdd_adult.take(2)

<CODE END>

The code to transform the raw lines of CSV text into their own arrays is a little more complex and deserves its own post.  Here are some good links for getting started with Transformations and Actions.  Fortunately, splitting the elements into their own sub-arrays also fixed the awkward display issue.  Next, let's look at Data Frames.
Unnamed and Untyped Data Frame

<CODE START>

val df_adult_nonames = sqlContext.read.csv("/databricks-datasets/adult/adult.data")
df_adult_nonames.show(2)

<CODE END>

We see that creating a Data Frame from a CSV is a good deal simpler than the RDD was.  The ability to use the .read.csv() function abstracts away what we had to do manually before.  A major difference is that we create Data Frames using a SQLContext, which is a layer on top of the SparkContext that allows us to perform more SQL-like operations.  Again, contexts are very important and deserve their own post.

Storing the data as a Data Frame gives us the substantial advantage of storing each individual data point in a named column.  This allows us to easily manipulate individuals columns.  We'll see more on this later.  The previous screenshot showed that we can create Data Frames without supplying any column information at all.  However, we also have the ability to supply the column names so we don't have to use the default ones.
Named and Typed Data Frame

<CODE START>

val df_adult_names = sqlContext.read
  .option("inferSchema", "true")
  .csv("/databricks-datasets/adult/adult.data").toDF(
    "age"
    ,"workclass"
    ,"fnlwgt"
    ,"education"
    ,"educationnum"
    ,"maritalstatus"
    ,"occupation"
    ,"relationship"
    ,"race"
    ,"sex"
    ,"capitalgain"
    ,"capitalloss"
    ,"hoursperweek"
    ,"nativecountry"
    ,"income"
  )
df_adult.show(2)

<CODE END>

We see that the .option() and .toDF() functions allow us to infer the schema and pass the column names easily.  Another difference between RDDs and Data Frames is what we display Data Frames using the .show() function instead .take().  Interestingly, the .collect() and .take() functions can be used to turn Data Frames back into RDDs.  Data Frames and Datasets also have a .rdd attribute that accomplishes this.
Take from Data Frame

<CODE START>

df_adult_names.take(2)

<CODE END>

Speaking of Datasets, let's take a look at one.  Basically, Data Frames are a special type of Dataset.  Technically speaking, a Data Frame is an UNTYPED Dataset of Rows.  Basically, a Row is designed to closely resembles what a record from a relational table would look like.  A Row contains a fixed number of elements that represent individual objects.  We can also use STRONGLY-TYPED Datasets of custom classes, other than Rows.  For instance, we can create a Dataset out of our named and typed Data Frame by explicitly creating a case class for it.
Dataset

<CODE START>

case class Adult (
  age: Int
  ,workclass: String
  ,fnlwgt: Double
  ,education: String
  ,educationnum: Double
  ,maritalstatus: String
  ,occupation: String
  ,relationship: String
  ,race: String
  ,sex: String
  ,capitalgain: Double
  ,capitalloss: Double
  ,hoursperweek: Double
  ,nativecountry: String
  ,income: String
)

val ds_adult = df_adult_names.as[Adult]

ds_adult.show(2)

<CODE END>

Having a strongly-typed Dataset does offer a few advantages from a deep technical perspective.  Simply put, if we can be certain that an object is of a certain type, then we can build more robust applications.  Interestingly, a major difference between Data Frames and Datasets is that Datasets can be reverted back to the original objects, whereas Data Frames cannot.  For instance, assume that we have an RDD of objects.  If we cast this RDD to a Dataset and back to an RDD, then we will have our original RDD back.  However, if we cast this RDD to a Data Frame and back to an RDD, then we will have a new RDD of Rows because the Data Frame can only hold Rows.

Now, let's try a few basic data manipulation operations.  First, let's select two "columns" from our data, age and income.
Select RDD

<CODE START>

val rdd_select = rdd_adult
  .map(x => (x(0), x(14)))

rdd_select.take(2)

<CODE END>


Select Data Frame

<CODE START>

val df_select = df_adult_names
  .select("age", "income")

df_select.show(2)

<CODE END>

Select Dataset

<CODE START>

val ds_select = df_adult_names
  .select("age", "income")
ds_select.show(2)

<CODE END>

We see that RDDs require the awkward .map() function where we build an anonymous function and reference the "columns" (in RDD terms, they are actually elements within a tuple) by their indices.  Data Frames and Datasets, on the other hand, allow us to use the much more natural .select() function and reference the columns by their names.  However, if we look very closely at the output for the Dataset command, we see that the .select() functions returned a Data Frame, not a Dataset.  We need to remember that Datasets are simply strongly-typed Data Frames.  Therefore, we can cast this into a Dataset again by creating a new case class for the new object that we are creating.
Select Dataset (Typed)

<CODE START>

case class Adult_select (
  age: Int
  ,income: String
)

val ds_select = ds_adult
  .select("age", "income")
  .as[Adult_select]

ds_select.show(2)

<CODE END>

Obviously, this trend will repeat itself for every step, so we won't mention it again.  Interestingly, it's also possible to operate on Datasets (but NOT Data Frames) using the .map() function.

Select Dataset (map)

<CODE START>

case class Adult_map (
  age: Int
  ,income: String
)

val ds_map = ds_adult
  .map(x => (x.age, x.income))
  .withColumnRenamed("_1", "age")
  .withColumnRenamed("_2", "income")
  .as[Adult_map]

ds_map.show(2)

<CODE END>

This is another advantage of Datasets.  However, using the .map() function with an anonymous function removes the column names, resulting in even more code to add them back.  All in all, the .select() approach is much simpler.  Next, let's take this one step further and create a new column, ageinmonths.
New Column RDD

<CODE START>

val rdd_col = rdd_adult
  .map(x => (x(0), x(0).toInt * 12))

rdd_col.take(2)

<CODE END>
New Column Data Frame

<CODE START>

val df_col = df_adult_names
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")

df_col.show(2)

<CODE END>


New Column Dataset

<CODE START>

case class Adult_col (
  age: Int
  ,ageinmonths: Int
)

val ds_col = ds_adult
  .withColumn("ageinmonths", $"age" * 12)
  .select("age", "ageinmonths")
  .as[Adult_col]

ds_col.show(2)

<CODE END>

We see that the RDD code leverages the .map() function again, while the Data Frame and Dataset code leverage the .withColumn() and .select() functions.  Again, we see that the Data Frame and Dataset code sacrifices compactness for readability.  We've heard that there's also a performance difference as well.  However, given the simplistic operations and small dataset, we were not able to verify this.  We'll dig into this in a later post.

To back up to a comment at the beginning of this post, why did we choose to use Scala to showcase this functionality instead of Python, a language that we are much more comfortable with and is honestly much more readable?  Per the official Spark documentation,
Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.
While this isn't a post about the differences between Spark programming languages, we do see an interesting dichotomy arising.  Simply put, Java and Scala are great languages for developing robust, programmer-friendly applications, while Python and R are great languages for creating rich, analyst-friendly analyses.  In fact, it's quite common for analysts to use PySpark or SparkR to munge through large amounts of data, then pull a much smaller portion of that data into more manageable objects, like Pandas or R Data Frames for deeper analysis or visualizations.  This allows them to leverage the power of Spark when it's needed and keep it simpler when it isn't.

This seems like a great place to stop for today.  We hope that this post open your eyes a little to the differences between RDDs, Data Frames and Datasets, as well as how you can leverage them in your analyses.  Stay tuned for the next post where we'll dig deeper and create even more complex transformations.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com

Monday, August 26, 2019

Azure Databricks: Hive (SQL) Database

Today, we're going to talk about the Hive Database in Azure Databricks.  If you haven't read the previous posts in this series, IntroductionCluster Creation, Notebooks and Databricks File System (DBFS), they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, Hive.

In the previous post, we looked at the way to store files, unstructured and semi-structured data in DBFS.  Now, let's look at how to store structured data in a SQL format.

Each Databricks Workspace comes with a Hive Metastore automatically included.  This provides us the ability to create Databases and Tables across any of the associated clusters and notebooks.  Let's start by creating and populating a simple table using SQL.
Simple Table from SQL

<CODE START>

%sql

DROP TABLE IF EXISTS Samp;

CREATE TABLE Samp AS
SELECT 1 AS Col
UNION ALL
SELECT 2 AS Col;


SELECT * FROM Samp

<CODE END>

We see that there was no configuration necessary to be able to create a SQL table in the Notebook.  Now that this table is created, we can query it from a different notebook connected to a different cluster, as long as they are within the same Workspace.
Different Notebook and Cluster

<CODE START>

%sql


SELECT * FROM Samp

<CODE END>

We're not just limited to creating tables using SQL.  We can also create tables using Python, R and Scala as well.
Simple Table from Python

<CODE START>

%python

from pyspark.sql import Row

pysamp = spark.createDataFrame([Row(Col=1),Row(Col=2)])

pysamp.write.saveAsTable('PySamp')

<CODE END>

<CODE START>

%sql

SELECT * FROM PySamp

<CODE END>

Simple Table from R

<CODE START>

%r

library(SparkR)

rsamp <- createDataFrame(data.frame(Col = c(1,2)))
registerTempTable(rsamp, "RSampT")
s = sql("CREATE TABLE RSamp AS SELECT * FROM RSampT")

dropTempTable("RSampT")

<CODE END>

<CODE START>

%sql


SELECT * FROM RSamp

<CODE END>

Simple Table from Scala

<CODE START>

%scala

case class samp(Col: Int)
val samp1 = samp(1)
val samp2 = samp(2)
val scalasamp = Seq(samp1, samp2).toDF()

scalasamp.write.saveAsTable("ScalaSamp")

<CODE END>

<CODE START>

%sql


SELECT * FROM ScalaSamp

<CODE END>

Each of these three languages has a different way of creating Spark Data Frames.  Once the Data Frames are created, Python and Scala both support the .write.saveAsTable() functionality to create a SQL table directly.  However, R requires us to create a Temporary Table first using the registerTempTable() function, then use that to create a Persisted Table via the sql() function.  This is a great segue into the next topic, Temporary Tables.

A Temporary Table, also known as a Temporary View, is similar to a table, except that it's only accessible within the Session where it was created.  This allows us to use SQL tables as intermediate stores without worrying about what else is running in other clusters, notebooks or jobs.  Since Temporary Views share the same namespace as Persisted Tables, it's advisable to ensure that they have unique names across the entire workspace.  Let's take a look.
Temporary View from SQL

<CODE START>

%sql

DROP TABLE IF EXISTS Tempo;

CREATE TEMPORARY VIEW Tempo AS
SELECT 1 AS Col
UNION ALL
SELECT 2 AS Col;


SELECT * FROM Tempo

<CODE END>

Same Notebook, Same Cluster
Same Notebook, Different Cluster
Different Notebook, Same Cluster

<CODE START>

%sql

SELECT * FROM Tempo

<CODE END>

We see that the Temporary View is only accessible from the Session that created it.  Just as before, we can create Temporary Views using Python, R and Scala as well.
Temporary View from Python

<CODE START>

%python

from pyspark.sql import Row

pysamp = spark.createDataFrame([Row(Col=1),Row(Col=2)])

pysamp.createOrReplaceTempView("PyTempo")

<CODE END>

<CODE START>

%sql


SELECT * FROM PyTempo

<CODE END>

Temporary View from R

<CODE START>

%r

library(SparkR)

rsamp <- createDataFrame(data.frame(Col = c(1,2)))

registerTempTable(rsamp, "RTempo")

<CODE END>

<CODE START>

%sql


SELECT * FROM RTempo

<CODE END>

Temporary View from Scala

<CODE START>

%scala

case class samp(Col: Int)
val samp1 = samp(1)
val samp2 = samp(2)
val scalasamp = Seq(samp1, samp2).toDF()

scalasamp.createOrReplaceTempView("ScalaTempo")

<CODE END>

<CODE START>

%sql


SELECT * FROM ScalaTempo

<CODE END>

Again, we see the similarities between Python and Scala, with both supporting the .createOrReplaceTempView() function.  R is the odd man out here using the same registerTempTable() function we saw earlier.

These are great ways to create Persisted and Temporary Tables from data that we already have access to within the notebook.  However, Hive gives us access to something that is simply not possible with most other SQL technologies, External Tables.

Simply put, an External Table is a table built directly on top of a folder within a data source.  This means that the data is not hidden away in some proprietary SQL format.  Instead, the data is completely accessible to outside systems in its native format.  The main reason for this is that it gives us the ability to create "live" queries on top of text data sources.  Every time a query is executed against the table, the query is run against the live data in the folder.  This means that we don't have to run ETL jobs to load data into the table.  Instead, all we need to do is put the structured files in the folder and the queries will automatically surface the new data.

In the previous post, we mounted an Azure Blob Storage Container that contains a csv file.  Let's build an external table on top of that location.
Create External Table

<CODE START>

%sql

CREATE EXTERNAL TABLE Ext (
  age INT
  ,capitalgain INT
  ,capitalloss INT
  ,education STRING
  ,educationnum INT
  ,fnlwgt INT
  ,hoursperweek INT
  ,income STRING
  ,maritalstatus STRING
  ,nativecountry STRING
  ,occupation STRING
  ,race STRING
  ,relationship STRING
  ,sex STRING
  ,workclass STRING
)
LOCATION '/mnt/breakingbi/'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

SELECT * FROM Ext

WHERE education != 'education'

<CODE END>

We see that it's as simple as defining the schema, location and file type.  The one downside is that the current version of Azure Databricks at the time of writing does not support skipping header rows.  So, we do need to add a line of logic to remove these records.  Before we move on to how to hide that issue, let's look at one neat piece of functionality with External Tables.  Not only do they support reading from file locations, they also support writing to file locations.  Let's duplicate all of the records in this table and see what happens to the mounted location.
Insert Into External Table
Mounted Location Post-Insert
We see that the number of records in the table have doubled.  When we look at the mounted location, we also see that there have been some new files created.  In typical Spark style, these files have very long, unique names, but the External Table doesn't care about any of that.

Finally, let's look at how to clean up our external table so that it doesn't show the header rows anymore.  We can do this by creating a View (not to be confused with the Temporary View we discussed earlier).
Create View

<CODE START>

%sql

CREATE VIEW IF NOT EXISTS vExt AS
SELECT * FROM Ext
WHERE education != 'education';


SELECT * FROM vExt

<CODE END>

A View is nothing more than a canned SELECT query that can be queried as if it was a Table.  Views are a fantastic way to obscure complex technical logic from end users.  All they need to be able to do is query the table or view, they don't need to worry about the man behind the curtain.

We hope this post enlightened you to the powers of using SQL within Azure Databricks.  While Python, R and Scala definitely have their strengths, the ability to combine those with SQL provide a level of functionality that isn't really matched by any other technology on the market today.  The official Azure Databricks SQL documentation can be found here.  Stay tuned for the next post where we'll dig into RDDs, Data Frames and Datasets.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com

Monday, August 5, 2019

Azure Databricks: Databricks File System (DBFS)

Today, we're going to talk about the Databricks File System (DBFS) in Azure Databricks.  If you haven't read the previous posts in this series, IntroductionCluster Creation and Notebooks, they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, DBFS.

As we mentioned in the previous post, there are three major concepts for us to understand about Azure Databricks, Clusters, Code and Data.  For this post, we're going to talk about the storage layer underneath Azure Databricks, DBFS.  Since Azure Databricks manages Spark clusters, it requires an underlying Hadoop Distributed File System (HDFS).  This is exactly what DBFS is.  Basically, HDFS is the low cost, fault-tolerant, distributed file system that makes the entire Hadoop ecosystem work.  We may dig deeper into HDFS in a later post.  For now, you can read more about HDFS here and here.

Let's start by looking at the current DBFS structure.
Add Data
We can do this by selecting the "Data" button from the left navigation pane, then selecting the "Add Data" button at the top of the fly-out.  It's important to note that databases and tables are not accessible unless there is a cluster running.  However, the data is still present in DBFS. It's just not currently accessible as a table.
DBFS
In the "Create New Table" window, we can see the contents of DBFS by selecting the "DBFS" tab and navigating through the folders.  As seen in both screenshots, we don't have any tables currently or any data stored in DBFS.  So, there's not much to see here.  However, there are actually a few other folders that can't be seen in this UI.  We can access these by using the %fs commands within a Notebook.  If you're not familiar with Notebooks, check out our previous post.
fs ls

<CODE START>

%fs

ls

<CODE END>

We can use the familiar ls command to see what's in each of these folders.
FileStore

<CODE START>

%fs

ls /FileStore/

<CODE END>

The "FileStore" folder contains all of our accessible data, including Database and Table data, plots created in Notebooks and JAR files.  You can read more about the FileStore here.
databricks-datasets

<CODE START>

%fs

ls /databricks-datasets/

<CODE END>

The "databricks-datasets" folder contain a number of different sample datasets to use.  We'll be sure to leverage some of these throughout this series.  There are also some README files that we could look at.  We'll see how to do this later in this post.
databricks-results

<CODE START>

%fs

ls /databricks-results/

<CODE END>

There seems to be something blocking us from viewing the "databricks-results" folder.  It's not clear whether this is by design or some type of bug.  We would guess that this folder is used by the Databricks platform to store intermediate results, but that's just a guess based on the folder name.
ml

<CODE START>

%fs

ls /ml/

<CODE END>

The "ml" folder is currently empty.  Hypothesizing again, we think this folder may be used for MLFlow functionality.  This is a topic that we're looking forward to exploring in a later post.  You can read more about MLFlow here.
tmp

<CODE START>

%fs

ls /tmp/

<CODE END>

The "tmp" folder contains a number of subfolders that varies based on the size of the environment.  Guessing one final time, it looks like this folder houses Hive temp files used for intermediate calculations.  This makes us question or our earlier guess about the "databricks-results" folder.  We'll be sure to come back with more information once we find out more about these additional folders.

Interestingly, there's a level of folders outside of DBFS that can be accessed using %sh commands.
sh ls

<CODE START>

%sh

ls /

<CODE END>

One of the folders we see here is "dbfs".  Obviously this takes us to the DBFS folders we were looking at earlier.  More advanced Databricks users could potentially delve deeper into some of these other folders.

Now that we've poked around the folders using ls, let's take a look at the README files we found earlier.  To do this, we first need to copy them from the "databricks-datasets" folder to the "FileStore" folder.
Copy Readme Files

<CODE START>

%fs

cp /databricks-datasets/README.md /FileStore/readme/README.md

<CODE END>

<CODE START>

%fs

cp /databricks-datasets/SPARK_README.md /FileStore/readme/SPARK_README.md

<CODE END>

After copying the files, they can be downloaded from any web browser using the following format:
https://<INSTANCE>/files/<FILE_PATH>?o=<WORKSPACE_ID>
The <FILE_PATH> was decided in our earlier fs command and the <INSTANCE> and <WORKSPACE_ID> can be found in the URL for the Databricks cluster.  For instance, our Databricks URL is
https://eastus2.azuredatabricks.net/?o=3558890778145077#
This parses to
<INSTANCE> = eastus2.azuredatabricks.net
<FILE_PATH> = readme/README.md
<WORKSPACE_ID> = 3558890778145077
So, the final URL is
 https://eastus2.azuredatabricks.net/files/readme/README.md?o=3558890778145077
 This URL provides a download for the README.md file.  Changing <FILE_PATH> to "readme/SPARK_README.md" will yield the SPARK_README.md file.  This methodology

The files can be opened in almost any text editor or markdown parser.  They contain basic information about Databricks and Spark.

Finally, let's take a look at the production-grade way to work with data in Databricks.  We can accomplish this via Mounting.  Mounting allows us to reference external file stores, such as Azure Blob Storage, Azure Data Lake Store Gen1 and Azure Data Lake Store Gen2, as if they are part of DBFS.  The dbutils.fs.mount() function can accomplish this, with the syntax varying slightly between Scala and Python.  In Python, we can use the following command to mount an Azure Blob Storage account:
dbutils.fs.mount(
source = "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net",
mount_point = "/mnt/<mount-name>",
extra_configs = {"fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net":"<access-key>"})
Mount Azure Blob Storage

<CODE START>


#dbutils.fs.mount(
#  source = "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net",
#  mount_point = "/mnt/<mount-name>",
#  extra_configs = {"fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net":"<access-key>"})

dbutils.fs.mount(
  source = "wasbs://blob@breakingbi.blob.core.windows.net",
  mount_point = "/mnt/breakingbi",
  extra_configs = {"fs.azure.account.key.breakingbi.blob.core.windows.net":"aKNiYJntZUA9asI/uYH1KnB2OzydeD8akiEnhp4s4LKedqXRjNCcQ0aLRYIhMbt/mMEr3sJAmYc1lrCbBeHfIw=="})

<CODE END>

<CODE START>


%fs

ls /mnt/breakingbi

<CODE END>

By looking at the mounted location with ls, we can see the adult.csv file that we placed there.  You can read more about mounting at the following links: Azure Blob Storage, Azure Data Lake Store Gen1 and Azure Data Lake Store Gen2.  Mounting is especially helpful as it allows us to create SQL tables that sit on top of live data in a mounted external data store, but that's a topic for another day.  As a sidenote, access keys and passwords should never be stored as plain text in Notebooks.  Instead, Azure Databricks offers a Secrets API backed by Azure Key Vault.  We've covered this briefly in a previous post and will likely do so again in more depth.

Hopefully, this post helped unravel a little of what's going on inside the Databricks File System.  Leveraging DBFS is critical to creating an efficient, secure, production-quality Databricks environment.  The official Azure Databricks DBFS documentation can be found here.  Stay tuned for the next post where we'll dig into the SQL Database underlying Azure Databricks.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com