Monday, August 26, 2019

Azure Databricks: Hive (SQL) Database

Today, we're going to talk about the Hive Database in Azure Databricks.  If you haven't read the previous posts in this series, IntroductionCluster Creation, Notebooks and Databricks File System (DBFS), they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, Hive.

In the previous post, we looked at the way to store files, unstructured and semi-structured data in DBFS.  Now, let's look at how to store structured data in a SQL format.

Each Databricks Workspace comes with a Hive Metastore automatically included.  This provides us the ability to create Databases and Tables across any of the associated clusters and notebooks.  Let's start by creating and populating a simple table using SQL.
Simple Table from SQL

<CODE START>

%sql

DROP TABLE IF EXISTS Samp;

CREATE TABLE Samp AS
SELECT 1 AS Col
UNION ALL
SELECT 2 AS Col;


SELECT * FROM Samp

<CODE END>

We see that there was no configuration necessary to be able to create a SQL table in the Notebook.  Now that this table is created, we can query it from a different notebook connected to a different cluster, as long as they are within the same Workspace.
Different Notebook and Cluster

<CODE START>

%sql


SELECT * FROM Samp

<CODE END>

We're not just limited to creating tables using SQL.  We can also create tables using Python, R and Scala as well.
Simple Table from Python

<CODE START>

%python

from pyspark.sql import Row

pysamp = spark.createDataFrame([Row(Col=1),Row(Col=2)])

pysamp.write.saveAsTable('PySamp')

<CODE END>

<CODE START>

%sql

SELECT * FROM PySamp

<CODE END>

Simple Table from R

<CODE START>

%r

library(SparkR)

rsamp <- createDataFrame(data.frame(Col = c(1,2)))
registerTempTable(rsamp, "RSampT")
s = sql("CREATE TABLE RSamp AS SELECT * FROM RSampT")

dropTempTable("RSampT")

<CODE END>

<CODE START>

%sql


SELECT * FROM RSamp

<CODE END>

Simple Table from Scala

<CODE START>

%scala

case class samp(Col: Int)
val samp1 = samp(1)
val samp2 = samp(2)
val scalasamp = Seq(samp1, samp2).toDF()

scalasamp.write.saveAsTable("ScalaSamp")

<CODE END>

<CODE START>

%sql


SELECT * FROM ScalaSamp

<CODE END>

Each of these three languages has a different way of creating Spark Data Frames.  Once the Data Frames are created, Python and Scala both support the .write.saveAsTable() functionality to create a SQL table directly.  However, R requires us to create a Temporary Table first using the registerTempTable() function, then use that to create a Persisted Table via the sql() function.  This is a great segue into the next topic, Temporary Tables.

A Temporary Table, also known as a Temporary View, is similar to a table, except that it's only accessible within the Session where it was created.  This allows us to use SQL tables as intermediate stores without worrying about what else is running in other clusters, notebooks or jobs.  Since Temporary Views share the same namespace as Persisted Tables, it's advisable to ensure that they have unique names across the entire workspace.  Let's take a look.
Temporary View from SQL

<CODE START>

%sql

DROP TABLE IF EXISTS Tempo;

CREATE TEMPORARY VIEW Tempo AS
SELECT 1 AS Col
UNION ALL
SELECT 2 AS Col;


SELECT * FROM Tempo

<CODE END>

Same Notebook, Same Cluster
Same Notebook, Different Cluster
Different Notebook, Same Cluster

<CODE START>

%sql

SELECT * FROM Tempo

<CODE END>

We see that the Temporary View is only accessible from the Session that created it.  Just as before, we can create Temporary Views using Python, R and Scala as well.
Temporary View from Python

<CODE START>

%python

from pyspark.sql import Row

pysamp = spark.createDataFrame([Row(Col=1),Row(Col=2)])

pysamp.createOrReplaceTempView("PyTempo")

<CODE END>

<CODE START>

%sql


SELECT * FROM PyTempo

<CODE END>

Temporary View from R

<CODE START>

%r

library(SparkR)

rsamp <- createDataFrame(data.frame(Col = c(1,2)))

registerTempTable(rsamp, "RTempo")

<CODE END>

<CODE START>

%sql


SELECT * FROM RTempo

<CODE END>

Temporary View from Scala

<CODE START>

%scala

case class samp(Col: Int)
val samp1 = samp(1)
val samp2 = samp(2)
val scalasamp = Seq(samp1, samp2).toDF()

scalasamp.createOrReplaceTempView("ScalaTempo")

<CODE END>

<CODE START>

%sql


SELECT * FROM ScalaTempo

<CODE END>

Again, we see the similarities between Python and Scala, with both supporting the .createOrReplaceTempView() function.  R is the odd man out here using the same registerTempTable() function we saw earlier.

These are great ways to create Persisted and Temporary Tables from data that we already have access to within the notebook.  However, Hive gives us access to something that is simply not possible with most other SQL technologies, External Tables.

Simply put, an External Table is a table built directly on top of a folder within a data source.  This means that the data is not hidden away in some proprietary SQL format.  Instead, the data is completely accessible to outside systems in its native format.  The main reason for this is that it gives us the ability to create "live" queries on top of text data sources.  Every time a query is executed against the table, the query is run against the live data in the folder.  This means that we don't have to run ETL jobs to load data into the table.  Instead, all we need to do is put the structured files in the folder and the queries will automatically surface the new data.

In the previous post, we mounted an Azure Blob Storage Container that contains a csv file.  Let's build an external table on top of that location.
Create External Table

<CODE START>

%sql

CREATE EXTERNAL TABLE Ext (
  age INT
  ,capitalgain INT
  ,capitalloss INT
  ,education STRING
  ,educationnum INT
  ,fnlwgt INT
  ,hoursperweek INT
  ,income STRING
  ,maritalstatus STRING
  ,nativecountry STRING
  ,occupation STRING
  ,race STRING
  ,relationship STRING
  ,sex STRING
  ,workclass STRING
)
LOCATION '/mnt/breakingbi/'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

SELECT * FROM Ext

WHERE education != 'education'

<CODE END>

We see that it's as simple as defining the schema, location and file type.  The one downside is that the current version of Azure Databricks at the time of writing does not support skipping header rows.  So, we do need to add a line of logic to remove these records.  Before we move on to how to hide that issue, let's look at one neat piece of functionality with External Tables.  Not only do they support reading from file locations, they also support writing to file locations.  Let's duplicate all of the records in this table and see what happens to the mounted location.
Insert Into External Table
Mounted Location Post-Insert
We see that the number of records in the table have doubled.  When we look at the mounted location, we also see that there have been some new files created.  In typical Spark style, these files have very long, unique names, but the External Table doesn't care about any of that.

Finally, let's look at how to clean up our external table so that it doesn't show the header rows anymore.  We can do this by creating a View (not to be confused with the Temporary View we discussed earlier).
Create View

<CODE START>

%sql

CREATE VIEW IF NOT EXISTS vExt AS
SELECT * FROM Ext
WHERE education != 'education';


SELECT * FROM vExt

<CODE END>

A View is nothing more than a canned SELECT query that can be queried as if it was a Table.  Views are a fantastic way to obscure complex technical logic from end users.  All they need to be able to do is query the table or view, they don't need to worry about the man behind the curtain.

We hope this post enlightened you to the powers of using SQL within Azure Databricks.  While Python, R and Scala definitely have their strengths, the ability to combine those with SQL provide a level of functionality that isn't really matched by any other technology on the market today.  The official Azure Databricks SQL documentation can be found here.  Stay tuned for the next post where we'll dig into RDDs, Data Frames and Datasets.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com

Monday, August 5, 2019

Azure Databricks: Databricks File System (DBFS)

Today, we're going to talk about the Databricks File System (DBFS) in Azure Databricks.  If you haven't read the previous posts in this series, IntroductionCluster Creation and Notebooks, they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, DBFS.

As we mentioned in the previous post, there are three major concepts for us to understand about Azure Databricks, Clusters, Code and Data.  For this post, we're going to talk about the storage layer underneath Azure Databricks, DBFS.  Since Azure Databricks manages Spark clusters, it requires an underlying Hadoop Distributed File System (HDFS).  This is exactly what DBFS is.  Basically, HDFS is the low cost, fault-tolerant, distributed file system that makes the entire Hadoop ecosystem work.  We may dig deeper into HDFS in a later post.  For now, you can read more about HDFS here and here.

Let's start by looking at the current DBFS structure.
Add Data
We can do this by selecting the "Data" button from the left navigation pane, then selecting the "Add Data" button at the top of the fly-out.  It's important to note that databases and tables are not accessible unless there is a cluster running.  However, the data is still present in DBFS. It's just not currently accessible as a table.
DBFS
In the "Create New Table" window, we can see the contents of DBFS by selecting the "DBFS" tab and navigating through the folders.  As seen in both screenshots, we don't have any tables currently or any data stored in DBFS.  So, there's not much to see here.  However, there are actually a few other folders that can't be seen in this UI.  We can access these by using the %fs commands within a Notebook.  If you're not familiar with Notebooks, check out our previous post.
fs ls

<CODE START>

%fs

ls

<CODE END>

We can use the familiar ls command to see what's in each of these folders.
FileStore

<CODE START>

%fs

ls /FileStore/

<CODE END>

The "FileStore" folder contains all of our accessible data, including Database and Table data, plots created in Notebooks and JAR files.  You can read more about the FileStore here.
databricks-datasets

<CODE START>

%fs

ls /databricks-datasets/

<CODE END>

The "databricks-datasets" folder contain a number of different sample datasets to use.  We'll be sure to leverage some of these throughout this series.  There are also some README files that we could look at.  We'll see how to do this later in this post.
databricks-results

<CODE START>

%fs

ls /databricks-results/

<CODE END>

There seems to be something blocking us from viewing the "databricks-results" folder.  It's not clear whether this is by design or some type of bug.  We would guess that this folder is used by the Databricks platform to store intermediate results, but that's just a guess based on the folder name.
ml

<CODE START>

%fs

ls /ml/

<CODE END>

The "ml" folder is currently empty.  Hypothesizing again, we think this folder may be used for MLFlow functionality.  This is a topic that we're looking forward to exploring in a later post.  You can read more about MLFlow here.
tmp

<CODE START>

%fs

ls /tmp/

<CODE END>

The "tmp" folder contains a number of subfolders that varies based on the size of the environment.  Guessing one final time, it looks like this folder houses Hive temp files used for intermediate calculations.  This makes us question or our earlier guess about the "databricks-results" folder.  We'll be sure to come back with more information once we find out more about these additional folders.

Interestingly, there's a level of folders outside of DBFS that can be accessed using %sh commands.
sh ls

<CODE START>

%sh

ls /

<CODE END>

One of the folders we see here is "dbfs".  Obviously this takes us to the DBFS folders we were looking at earlier.  More advanced Databricks users could potentially delve deeper into some of these other folders.

Now that we've poked around the folders using ls, let's take a look at the README files we found earlier.  To do this, we first need to copy them from the "databricks-datasets" folder to the "FileStore" folder.
Copy Readme Files

<CODE START>

%fs

cp /databricks-datasets/README.md /FileStore/readme/README.md

<CODE END>

<CODE START>

%fs

cp /databricks-datasets/SPARK_README.md /FileStore/readme/SPARK_README.md

<CODE END>

After copying the files, they can be downloaded from any web browser using the following format:
https://<INSTANCE>/files/<FILE_PATH>?o=<WORKSPACE_ID>
The <FILE_PATH> was decided in our earlier fs command and the <INSTANCE> and <WORKSPACE_ID> can be found in the URL for the Databricks cluster.  For instance, our Databricks URL is
https://eastus2.azuredatabricks.net/?o=3558890778145077#
This parses to
<INSTANCE> = eastus2.azuredatabricks.net
<FILE_PATH> = readme/README.md
<WORKSPACE_ID> = 3558890778145077
So, the final URL is
 https://eastus2.azuredatabricks.net/files/readme/README.md?o=3558890778145077
 This URL provides a download for the README.md file.  Changing <FILE_PATH> to "readme/SPARK_README.md" will yield the SPARK_README.md file.  This methodology

The files can be opened in almost any text editor or markdown parser.  They contain basic information about Databricks and Spark.

Finally, let's take a look at the production-grade way to work with data in Databricks.  We can accomplish this via Mounting.  Mounting allows us to reference external file stores, such as Azure Blob Storage, Azure Data Lake Store Gen1 and Azure Data Lake Store Gen2, as if they are part of DBFS.  The dbutils.fs.mount() function can accomplish this, with the syntax varying slightly between Scala and Python.  In Python, we can use the following command to mount an Azure Blob Storage account:
dbutils.fs.mount(
source = "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net",
mount_point = "/mnt/<mount-name>",
extra_configs = {"fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net":"<access-key>"})
Mount Azure Blob Storage

<CODE START>


#dbutils.fs.mount(
#  source = "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net",
#  mount_point = "/mnt/<mount-name>",
#  extra_configs = {"fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net":"<access-key>"})

dbutils.fs.mount(
  source = "wasbs://blob@breakingbi.blob.core.windows.net",
  mount_point = "/mnt/breakingbi",
  extra_configs = {"fs.azure.account.key.breakingbi.blob.core.windows.net":"aKNiYJntZUA9asI/uYH1KnB2OzydeD8akiEnhp4s4LKedqXRjNCcQ0aLRYIhMbt/mMEr3sJAmYc1lrCbBeHfIw=="})

<CODE END>

<CODE START>


%fs

ls /mnt/breakingbi

<CODE END>

By looking at the mounted location with ls, we can see the adult.csv file that we placed there.  You can read more about mounting at the following links: Azure Blob Storage, Azure Data Lake Store Gen1 and Azure Data Lake Store Gen2.  Mounting is especially helpful as it allows us to create SQL tables that sit on top of live data in a mounted external data store, but that's a topic for another day.  As a sidenote, access keys and passwords should never be stored as plain text in Notebooks.  Instead, Azure Databricks offers a Secrets API backed by Azure Key Vault.  We've covered this briefly in a previous post and will likely do so again in more depth.

Hopefully, this post helped unravel a little of what's going on inside the Databricks File System.  Leveraging DBFS is critical to creating an efficient, secure, production-quality Databricks environment.  The official Azure Databricks DBFS documentation can be found here.  Stay tuned for the next post where we'll dig into the SQL Database underlying Azure Databricks.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com