Monday, April 22, 2019

Data Science In Power BI: Spark MLlib

Today, we're going to talk about Spark MLlib within Power BI.  If you haven't read the earlier posts in this series, IntroductionGetting Started with R ScriptsClusteringTime Series DecompositionForecastingCorrelationsCustom R VisualsR Scripts in Query EditorPythonAzure Machine Learning StudioStream AnalyticsStream Analytics with Azure Machine Learning StudioHDInsight Hive and Databricks Spark, they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, Spark MLlib.

Technically, we won't be running Spark MLlib commands within Power BI in this post.  There's probably a way to pull that off using the built-in Python functionality.  Alas, that's a bit more complex than we'd like to get into here.  Instead, we're going to use Spark MLlib functionality within Databricks to generate some predictions for us, then expose those predictions using Power BI.  So, what's Spark MLlib?

MLlib is one of the primary extensions of Spark, along with Spark SQL, Spark Streaming and GraphX.  It is a machine learning framework built from the ground up to be massively scalable and operate within Spark.  This makes it an excellent choice for machine learning applications that need to crunch extremely large amounts of data.  You can read more about Spark MLlib here.

In order to leverage Spark MLlib, we obviously need a way to execute Spark code.  In our minds, there's no better tool for this than Azure Databricks.  In the previous post, we covered the creation of an Azure Databricks environment.  We're going to reuse that environment for this post as well.  We'll also use the same dataset that we've been using, which contains information about individual customers.  This dataset was originally designed to predict Income based on a number of factors.  However, we left the income out of this dataset a few posts back for reasons that were important then.  So, we're actually going to use this dataset to predict "Hours Per Week" instead.

Spoiler: The resulting model is terrible, but it lets us show off some cool functionality.

To accomplish this, we're going to use Pyspark.  Pyspark is a Python library that allows us to execute Spark commands using Python.  Given the meteoric rise of Python as a Data Engineering and Data Science Programming, as well it's ease of coding when compared to Scala, we find it to be a great option for Spark programming.  You can read more about Pyspark here.
Import Data
<CODE START>

df = sqlContext.read.parquet("/mnt/datadefaultparquet")


display(df)

<CODE END>

The first step is to import our data.  Since we previously mounted the Parquet data, this is a relatively simple use of "sqlContext".  The MLlib functionality that we are going to leverage today doesn't like strings very much.  In a real-world use case, we would use dummy encoding, also known as indicator variables and one-hot processing, to solve this problem.  We've covered this process using Azure Machine Learning Studio in this post.  It's not a basic task in PySpark and deserves its own post.  So, we're just going to remove all of the string columns.
Clean Data
<CODE START>

df_clean = df.drop('customerid', 'workclass', 'education', 'maritalstatus', 'occupation', 'relationship', 'race', 'sex', 'nativecountry')


display(df_clean)

<CODE END>

Now that our dataset is cleaned up and ready for modelling, we need to "vectorize" it.  Basically, MLlib doesn't like having multiple feature columns.  Instead, it wants us to provide a single "vector" that contains all of our features.  This is a requirement driven by the linear mathematics behind-the-scenes.  However, we're not sure why PySpark requires this, as it seems easy enough for it create on its own without our help.  Alas, it is a requirement nonetheless.  Fortunately, there's a built-in function to handle it.
Vectorize Features
<CODE START>

from pyspark.ml.feature import VectorAssembler

df_vec = \
  VectorAssembler( \
    inputCols=[ \
      c \
      for c in df_clean.columns \
      if c != 'hoursperweek' \
    ] \
    ,outputCol="features" \
  ) \
  .transform(df_clean) \
  .select(['hoursperweek', 'features'])


df_vec.show()

<CODE END>

First, we have to import the "VectorAssembler()" function from the "pyspark.ml.feature" library.  We don't need to install these libraries on the cluster, as they come prepackaged with Databricks clusters.  However, we still need to import them into our notebooks.  Then, we create a new "df_vec" data frame by telling the "Vector Assembler" to vectorize all features EXCEPT "hoursperweek" into a single column called "features".  We then see the result of this operation.

There are a few important syntactical notes here.  First, PySpark executes commands when it sees a line break.  So, to clean up our code using multiple lines, we have to add a "\" to the end of the line.  This tells PySpark that the command continues to the next line.  Also, the "display()" function displays some strangeness when we apply it to vectors.  So, we switched over to the ".show()" function.

Next, we need to create training and testing splits of our data.
Split Data
<CODE START>

(df_vec_train, df_vec_test) = df_vec.randomSplit([0.7, 0.3])

{
  'Train': {
    'Rows': df_vec_train.count()
    ,'Cols': len(df_vec_train.columns)
  }
  ,'Test': {
    'Rows': df_vec_test.count()
    ,'Cols': len(df_vec_test.columns)
  }

}

<CODE END>

We can use the built-in ".randomSplit()" function to achieve this.  This is one of the interesting Python functions that outputs multiple data frames at once.  Therefore, we need to capture it using the "(df1, df2)" syntax.  Finally, we can showcase the important aspects of these data frames by creating a dictionary, which uses the "{}" syntax.  We can see that data frames both have two columns, while the training data frame has many more observations than the testing data frame due to the 70/30 split.  Next, we can train our linear regression model.
Linear Regression
<CODE START>

from pyspark.ml.regression import LinearRegression

df_linreg = \
  LinearRegression(featuresCol='features', labelCol='hoursperweek') \
  .fit(df_vec_train)

df_linreg_pred = df_linreg.transform(df_vec_test)


df_linreg_pred.show()

<CODE END>

To accomplish this, we have to import another function, "LinearRegression()", this time from the "pyspark.ml.regression" library.  One of the interesting aspects of PySpark that took us some time to understand is that PySpark leverages a "lazy" execution model.  This means that commands do not run until there is some need for them to provide output.  This allows some interesting coding options.  Notice the "LinearRegression(featuresCol='features', labelCol='hoursperweek')" command that we run here.  This command is not tied to a dataset, meaning that it can't possibly return a result.  It is simply an empty mathematical formula waiting for a dataset to use it on.  However, it's still completely valid PySpark because, as we said, it isn't executed until it's needed.  So, once we apply the ".fit(df_vec_train)" function, it has a data set and can begin doing it's magic.  The lazy execution model is actually more complex than this, but this is a decent example to showcase it.

Moving on, now that we have predictions from our linear regression model, we can calculate our evaluation metrics to see how good our model is.  BRACE YOURSELVES!
Model Evaluation
<CODE START>

from pyspark.ml.evaluation import RegressionEvaluator

reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol='hoursperweek')
df_linreg_r2 = reg_eval.evaluate(df_linreg_pred, {reg_eval.metricName: "r2"})
df_linreg_rmse = reg_eval.evaluate(df_linreg_pred, {reg_eval.metricName: "rmse"})

{
  'R-Squared': df_linreg_r2
  ,'Root Mean Squared Error': df_linreg_rmse

}

<CODE END>

Wow....that is one terrible model.  If an R-Squared of 4% isn't a record low, then it must be close.  Regardless, this was a showcase of functionality given an incomplete dataset.  We were able to calculate these values using the "RegressionEvaluator()" function from "pyspark.ml.evaluation" library.

Finally, we need to store these results in the Databricks database for access from Power BI.
Predictions
<CODE START>

df_linreg_pred.write.saveAsTable("Predictions")

<CODE END>

<CODE START>

%sql


SELECT * FROM Predictions LIMIT 10;

<CODE END>

Now that our data is stored in the Predictions table, we can access it from Power BI.  As in the previous post, you can find the connection walkthrough here.
Power BI Report
This quick scatterplot of our predictions versus the actual values tells the same story as our R-Squared of 4%.  If our model was perfect, these points would all fall exactly on the line.  As an aside, this line set to "y=x" is called a "Ratio Line" in Power BI.

Hopefully, this post piqued your interest in combining the massive computing power of Spark MLlib with the ease of use and reporting capabilities of Power BI.  This blog series has been very interesting to write and we've learned a ton along the way.  We're not sure what the next series will be, but we hope you'll tune in!  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com

Monday, April 1, 2019

Data Science in Power BI: Databricks Spark

Today, we're going to talk about Databricks Spark within Power BI.  If you haven't read the earlier posts in this series, IntroductionGetting Started with R ScriptsClusteringTime Series DecompositionForecastingCorrelationsCustom R VisualsR Scripts in Query EditorPythonAzure Machine Learning StudioStream AnalyticsStream Analytics with Azure Machine Learning Studio and HDInsight Hive, they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, Databricks Spark.

Databricks is a managed Spark framework, similar to what we saw with HDInsight in the previous post.  The major difference between the two technologies is that HDInsight is more of a managed provisioning service for Hadoop, while Databricks is more like a managed Spark platform.  In other words, HDInsight is a good choice if we need the ability to manage the cluster ourselves, but don't want to deal with provisioning, while Databricks is a good choice when we simply want to have a Spark environment for running our code with little need for maintenance or management.

Azure Databricks is not a Microsoft product.  It is owned and managed by the company Databricks and available in Azure and AWS.  However, Databricks is a "first party offering" in Azure.  This means that Microsoft offers the same level of support, functionality and integration as it would with any of its own products.  You can read more about Azure Databricks here, here and here.

To get started with Azure Databricks, we need to create an account.  We can do this in the Azure Portal.
Databricks Account Creation
Now that we have an account, we can access our Azure Databricks Workspace through the Azure portal.
Launch Workspace
When we navigate to our Azure Databricks resource, we simply click the "Launch Workspace" button.
Azure Databricks Portal
This takes us to the Azure Databricks Portal.  Azure Databricks has three major components that we will leverage.  First, it has a SparkSQL database behind the scenes that we can store data in and query from.  Second, it stores our code as notebooks that we can use to interactively explore.  Finally, it allows us to create clusters that quickly spin up and spin down, separately from our code.  This means that we can destroy our clusters (and stop paying for them), without losing any of our code or data.
New Cluster
Let's start by creating our cluster definition.
Cluster Definition
Azure Databricks allows us to easily create Spark clusters with the ability to auto-scale.  This is a huge differentiator from HDInsight, which typically requires us to destroy and recreate the cluster if we want to add nodes.  In our case, we just want the smallest possible cluster, as our dataset is pretty small.  The cluster creation takes a few minutes.
New Notebook
Create Notebook
Once the cluster is created, we can create a notebook and select its default language.  One of the really cool things about Databricks is that we can write multiple query languages within the same notebook.  It's extremely simple to ingest data using Scala, process it using SQL, build predictive models using R and visualize it using Python, all inside a single notebook.  The language we select when creating the notebook is simply the default language, but it can easily be overridden using "%sql", "%scala", "%python" and "%r".  There's even a "%md" command for writing formatted text using markdown.  Now that we have a cluster and a notebook, let's use Scala to query data from our blob store.
Read Avro from Blob
<CODE START>

%scala

spark.conf.set(
  "fs.azure.account.key.datascienceinpowerbibig.blob.core.windows.net", <Storage Account Access Key>
);

val df = spark.read.format("avro").load("wasbs://interactivequery@datascienceinpowerbibig.blob.core.windows.net/data");

df.show();

<CODE END>

We were able to do this in a few basic lines of code.  Now that we can access our data, let's discuss the goal of this post.  In the previous post, we mentioned that Hive doesn't perform well against lots of small files.  This is true for Spark as well.  One technique for handling this is to have jobs that coalesce a large number of small files into a single large file.  These jobs can be run daily, weekly or monthly depending on the data needs of the organization.  Since we've already stored the entire dataset in the "df" Data Frame, it's trivial write all of this out to a set of randomly partitioned files or a single file.
Write Avro to Blob
<CODE START>

%scala

// Write using default partitions

df.write.format("avro").save("wasbs://interactivequery@datascienceinpowerbibig.blob.core.windows.net/datadefault/")

// Write using single partition

df.repartition(1).write.format("avro").save("wasbs://interactivequery@datascienceinpowerbibig.blob.core.windows.net/datasinglefile/")

<CODE END>

Since Spark is a distributed processing framework, the default state is to read and write files in a partitioned state.  However, we can overwrite this by using the "repartition()" function.  Let's take a look at the results.
Default Partitions

Single Partition
Immediately, we notice the awkward naming convention that Spark uses for its files.  This is a built-in feature of Hadoop and cannot be directly altered.  However, it is possible to use File System commands to rename the files after they've been written, if it's necessary for some reason.  We also see that Spark includes some status files as well.  These aren't a major concern, as most Hadoop technologies know how to leverage or ignore these files as needed.

Seeing this data leads us to two important questions.  First, is the total data volume smaller if we store it in multiple files?
Storage Statistics
Fortunately, Azure Storage Explorer has some built-in functionality for this.  It turns out that we saw an 83% size reduction using the default partitioning and an 87% size reduction using the single partitioning.  More importantly, what kind of read performance do we get off of these files?
Read Statistics
This test showed that the small files took 9.37 seconds, the default partitioned files took 0.67 seconds and the single file took 0.54 seconds.  These numbers are even better than the Storage Statistics, boasting a 93% read time reduction for the default partitioning and a 94% read reduction for the single file.  What's even more interesting here is that there doesn't seem to be a substantial difference between using the default partitioning and using a single file.  So, we'll just stick with the defaults moving forward.

Now that we've compiled all of our small files together, we need to expose the data in a way that Power BI can efficiently read.  Power BI can read from Blob Storage, but it's messy and we won't try that.  Fortunately, Databricks has a built-in Spark SQL Database that we can write our data to.
createOrReplaceTempView
<CODE START>

%scala


df.createOrReplaceTempView("Customers")

<CODE END>

<CODE START>

%sql


SELECT * FROM Customers LIMIT 10

<CODE END>

Databricks allows us to leverage a Scala function "createOrReplaceTempView()" that automatically creates a temporary table for us.  Unlike in the previous post, this table is not connected directly to the data in blob storage.  Instead, it's a copy of the data that is stored directly inside the Databricks DBFS Storage.  Just like with HDInsight, it is possible to create an external table that connects to remote file storage.  In order to do this, we must first mount the storage container.
Mount Storage Account
<CODE START>

%scala

dbutils.fs.mount(
  source = "wasbs://interactivequery@datascienceinpowerbibig.blob.core.windows.net/datadefault",
  mountPoint = "/mnt/datadefault",

  extraConfigs = Map("fs.azure.account.key.datascienceinpowerbibig.blob.core.windows.net" -> <Storage Account Access Key>))

<CODE END>

This command allows us to access the data in the "datadefault" folder within the "interactivequery" container, simply by referencing the "/mnt/datadefault" location as if it was local.  We can use this to create our external table.  Understanding that it's not good practice to use your account keys in our code, it's also possible to store our Account Keys using the Secrets API and reference them using the "dbutils.secrets.get" function.

Unfortunately, while writing this, we discovered that there is a bug affecting the use of Avro for this operation.  If we use the Avro data, we end up with a strange table.
CustomersBlob (Avro)
<CODE START>

%sql

CREATE EXTERNAL TABLE CustomersBlob(
  customerid string,
  age int,
  workclass string,
  fnlwgt int,
  education string,
  educationnum int,
  maritalstatus string,
  occupation string,
  relationship string,
  race string,
  sex string,
  capitalgain int,
  capitalloss int,
  hoursperweek int,
  nativecountry string
)
STORED AS AVRO
LOCATION '/mnt/datadefault/';


SELECT * FROM CustomersBlob LIMIT 10;

<CODE END>

Fortunately, saving the same data in Parquet format resolves the problem with no issue.
CustomersBlob (Parquet)
Now that this data is accessible through Databricks, we can access it from Power BI using the Spark connector.  First, we need to find the connection information.  In order to connect, we need to get the JDBC/ODBC connection information for the cluster, as well as an access token.  We won't show this process step-by-step, but you can find it here.
Customers
Hopefully, this post enlightened you to the possibilities of combining the processing power of Databricks Spark with Power BI.  Databricks is one of the coolest new kids on the block and provides an incredibly simple mechanism for implementing python, r, scala and sql code against the open-source Spark framework.  Stay tuned for the final post in this series where we'll dig into Spark MLlib.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com