Monday, April 22, 2019

Data Science In Power BI: Spark MLlib

Today, we're going to talk about Spark MLlib within Power BI.  If you haven't read the earlier posts in this series, IntroductionGetting Started with R ScriptsClusteringTime Series DecompositionForecastingCorrelationsCustom R VisualsR Scripts in Query EditorPythonAzure Machine Learning StudioStream AnalyticsStream Analytics with Azure Machine Learning StudioHDInsight Hive and Databricks Spark, they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, Spark MLlib.

Technically, we won't be running Spark MLlib commands within Power BI in this post.  There's probably a way to pull that off using the built-in Python functionality.  Alas, that's a bit more complex than we'd like to get into here.  Instead, we're going to use Spark MLlib functionality within Databricks to generate some predictions for us, then expose those predictions using Power BI.  So, what's Spark MLlib?

MLlib is one of the primary extensions of Spark, along with Spark SQL, Spark Streaming and GraphX.  It is a machine learning framework built from the ground up to be massively scalable and operate within Spark.  This makes it an excellent choice for machine learning applications that need to crunch extremely large amounts of data.  You can read more about Spark MLlib here.

In order to leverage Spark MLlib, we obviously need a way to execute Spark code.  In our minds, there's no better tool for this than Azure Databricks.  In the previous post, we covered the creation of an Azure Databricks environment.  We're going to reuse that environment for this post as well.  We'll also use the same dataset that we've been using, which contains information about individual customers.  This dataset was originally designed to predict Income based on a number of factors.  However, we left the income out of this dataset a few posts back for reasons that were important then.  So, we're actually going to use this dataset to predict "Hours Per Week" instead.

Spoiler: The resulting model is terrible, but it lets us show off some cool functionality.

To accomplish this, we're going to use Pyspark.  Pyspark is a Python library that allows us to execute Spark commands using Python.  Given the meteoric rise of Python as a Data Engineering and Data Science Programming, as well it's ease of coding when compared to Scala, we find it to be a great option for Spark programming.  You can read more about Pyspark here.
Import Data
<CODE START>

df = sqlContext.read.parquet("/mnt/datadefaultparquet")


display(df)

<CODE END>

The first step is to import our data.  Since we previously mounted the Parquet data, this is a relatively simple use of "sqlContext".  The MLlib functionality that we are going to leverage today doesn't like strings very much.  In a real-world use case, we would use dummy encoding, also known as indicator variables and one-hot processing, to solve this problem.  We've covered this process using Azure Machine Learning Studio in this post.  It's not a basic task in PySpark and deserves its own post.  So, we're just going to remove all of the string columns.
Clean Data
<CODE START>

df_clean = df.drop('customerid', 'workclass', 'education', 'maritalstatus', 'occupation', 'relationship', 'race', 'sex', 'nativecountry')


display(df_clean)

<CODE END>

Now that our dataset is cleaned up and ready for modelling, we need to "vectorize" it.  Basically, MLlib doesn't like having multiple feature columns.  Instead, it wants us to provide a single "vector" that contains all of our features.  This is a requirement driven by the linear mathematics behind-the-scenes.  However, we're not sure why PySpark requires this, as it seems easy enough for it create on its own without our help.  Alas, it is a requirement nonetheless.  Fortunately, there's a built-in function to handle it.
Vectorize Features
<CODE START>

from pyspark.ml.feature import VectorAssembler

df_vec = \
  VectorAssembler( \
    inputCols=[ \
      c \
      for c in df_clean.columns \
      if c != 'hoursperweek' \
    ] \
    ,outputCol="features" \
  ) \
  .transform(df_clean) \
  .select(['hoursperweek', 'features'])


df_vec.show()

<CODE END>

First, we have to import the "VectorAssembler()" function from the "pyspark.ml.feature" library.  We don't need to install these libraries on the cluster, as they come prepackaged with Databricks clusters.  However, we still need to import them into our notebooks.  Then, we create a new "df_vec" data frame by telling the "Vector Assembler" to vectorize all features EXCEPT "hoursperweek" into a single column called "features".  We then see the result of this operation.

There are a few important syntactical notes here.  First, PySpark executes commands when it sees a line break.  So, to clean up our code using multiple lines, we have to add a "\" to the end of the line.  This tells PySpark that the command continues to the next line.  Also, the "display()" function displays some strangeness when we apply it to vectors.  So, we switched over to the ".show()" function.

Next, we need to create training and testing splits of our data.
Split Data
<CODE START>

(df_vec_train, df_vec_test) = df_vec.randomSplit([0.7, 0.3])

{
  'Train': {
    'Rows': df_vec_train.count()
    ,'Cols': len(df_vec_train.columns)
  }
  ,'Test': {
    'Rows': df_vec_test.count()
    ,'Cols': len(df_vec_test.columns)
  }

}

<CODE END>

We can use the built-in ".randomSplit()" function to achieve this.  This is one of the interesting Python functions that outputs multiple data frames at once.  Therefore, we need to capture it using the "(df1, df2)" syntax.  Finally, we can showcase the important aspects of these data frames by creating a dictionary, which uses the "{}" syntax.  We can see that data frames both have two columns, while the training data frame has many more observations than the testing data frame due to the 70/30 split.  Next, we can train our linear regression model.
Linear Regression
<CODE START>

from pyspark.ml.regression import LinearRegression

df_linreg = \
  LinearRegression(featuresCol='features', labelCol='hoursperweek') \
  .fit(df_vec_train)

df_linreg_pred = df_linreg.transform(df_vec_test)


df_linreg_pred.show()

<CODE END>

To accomplish this, we have to import another function, "LinearRegression()", this time from the "pyspark.ml.regression" library.  One of the interesting aspects of PySpark that took us some time to understand is that PySpark leverages a "lazy" execution model.  This means that commands do not run until there is some need for them to provide output.  This allows some interesting coding options.  Notice the "LinearRegression(featuresCol='features', labelCol='hoursperweek')" command that we run here.  This command is not tied to a dataset, meaning that it can't possibly return a result.  It is simply an empty mathematical formula waiting for a dataset to use it on.  However, it's still completely valid PySpark because, as we said, it isn't executed until it's needed.  So, once we apply the ".fit(df_vec_train)" function, it has a data set and can begin doing it's magic.  The lazy execution model is actually more complex than this, but this is a decent example to showcase it.

Moving on, now that we have predictions from our linear regression model, we can calculate our evaluation metrics to see how good our model is.  BRACE YOURSELVES!
Model Evaluation
<CODE START>

from pyspark.ml.evaluation import RegressionEvaluator

reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol='hoursperweek')
df_linreg_r2 = reg_eval.evaluate(df_linreg_pred, {reg_eval.metricName: "r2"})
df_linreg_rmse = reg_eval.evaluate(df_linreg_pred, {reg_eval.metricName: "rmse"})

{
  'R-Squared': df_linreg_r2
  ,'Root Mean Squared Error': df_linreg_rmse

}

<CODE END>

Wow....that is one terrible model.  If an R-Squared of 4% isn't a record low, then it must be close.  Regardless, this was a showcase of functionality given an incomplete dataset.  We were able to calculate these values using the "RegressionEvaluator()" function from "pyspark.ml.evaluation" library.

Finally, we need to store these results in the Databricks database for access from Power BI.
Predictions
<CODE START>

df_linreg_pred.write.saveAsTable("Predictions")

<CODE END>

<CODE START>

%sql


SELECT * FROM Predictions LIMIT 10;

<CODE END>

Now that our data is stored in the Predictions table, we can access it from Power BI.  As in the previous post, you can find the connection walkthrough here.
Power BI Report
This quick scatterplot of our predictions versus the actual values tells the same story as our R-Squared of 4%.  If our model was perfect, these points would all fall exactly on the line.  As an aside, this line set to "y=x" is called a "Ratio Line" in Power BI.

Hopefully, this post piqued your interest in combining the massive computing power of Spark MLlib with the ease of use and reporting capabilities of Power BI.  This blog series has been very interesting to write and we've learned a ton along the way.  We're not sure what the next series will be, but we hope you'll tune in!  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com

Monday, April 1, 2019

Data Science in Power BI: Databricks Spark

Today, we're going to talk about Databricks Spark within Power BI.  If you haven't read the earlier posts in this series, IntroductionGetting Started with R ScriptsClusteringTime Series DecompositionForecastingCorrelationsCustom R VisualsR Scripts in Query EditorPythonAzure Machine Learning StudioStream AnalyticsStream Analytics with Azure Machine Learning Studio and HDInsight Hive, they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, Databricks Spark.

Databricks is a managed Spark framework, similar to what we saw with HDInsight in the previous post.  The major difference between the two technologies is that HDInsight is more of a managed provisioning service for Hadoop, while Databricks is more like a managed Spark platform.  In other words, HDInsight is a good choice if we need the ability to manage the cluster ourselves, but don't want to deal with provisioning, while Databricks is a good choice when we simply want to have a Spark environment for running our code with little need for maintenance or management.

Azure Databricks is not a Microsoft product.  It is owned and managed by the company Databricks and available in Azure and AWS.  However, Databricks is a "first party offering" in Azure.  This means that Microsoft offers the same level of support, functionality and integration as it would with any of its own products.  You can read more about Azure Databricks here, here and here.

To get started with Azure Databricks, we need to create an account.  We can do this in the Azure Portal.
Databricks Account Creation
Now that we have an account, we can access our Azure Databricks Workspace through the Azure portal.
Launch Workspace
When we navigate to our Azure Databricks resource, we simply click the "Launch Workspace" button.
Azure Databricks Portal
This takes us to the Azure Databricks Portal.  Azure Databricks has three major components that we will leverage.  First, it has a SparkSQL database behind the scenes that we can store data in and query from.  Second, it stores our code as notebooks that we can use to interactively explore.  Finally, it allows us to create clusters that quickly spin up and spin down, separately from our code.  This means that we can destroy our clusters (and stop paying for them), without losing any of our code or data.
New Cluster
Let's start by creating our cluster definition.
Cluster Definition
Azure Databricks allows us to easily create Spark clusters with the ability to auto-scale.  This is a huge differentiator from HDInsight, which typically requires us to destroy and recreate the cluster if we want to add nodes.  In our case, we just want the smallest possible cluster, as our dataset is pretty small.  The cluster creation takes a few minutes.
New Notebook
Create Notebook
Once the cluster is created, we can create a notebook and select its default language.  One of the really cool things about Databricks is that we can write multiple query languages within the same notebook.  It's extremely simple to ingest data using Scala, process it using SQL, build predictive models using R and visualize it using Python, all inside a single notebook.  The language we select when creating the notebook is simply the default language, but it can easily be overridden using "%sql", "%scala", "%python" and "%r".  There's even a "%md" command for writing formatted text using markdown.  Now that we have a cluster and a notebook, let's use Scala to query data from our blob store.
Read Avro from Blob
<CODE START>

%scala

spark.conf.set(
  "fs.azure.account.key.datascienceinpowerbibig.blob.core.windows.net", <Storage Account Access Key>
);

val df = spark.read.format("avro").load("wasbs://interactivequery@datascienceinpowerbibig.blob.core.windows.net/data");

df.show();

<CODE END>

We were able to do this in a few basic lines of code.  Now that we can access our data, let's discuss the goal of this post.  In the previous post, we mentioned that Hive doesn't perform well against lots of small files.  This is true for Spark as well.  One technique for handling this is to have jobs that coalesce a large number of small files into a single large file.  These jobs can be run daily, weekly or monthly depending on the data needs of the organization.  Since we've already stored the entire dataset in the "df" Data Frame, it's trivial write all of this out to a set of randomly partitioned files or a single file.
Write Avro to Blob
<CODE START>

%scala

// Write using default partitions

df.write.format("avro").save("wasbs://interactivequery@datascienceinpowerbibig.blob.core.windows.net/datadefault/")

// Write using single partition

df.repartition(1).write.format("avro").save("wasbs://interactivequery@datascienceinpowerbibig.blob.core.windows.net/datasinglefile/")

<CODE END>

Since Spark is a distributed processing framework, the default state is to read and write files in a partitioned state.  However, we can overwrite this by using the "repartition()" function.  Let's take a look at the results.
Default Partitions

Single Partition
Immediately, we notice the awkward naming convention that Spark uses for its files.  This is a built-in feature of Hadoop and cannot be directly altered.  However, it is possible to use File System commands to rename the files after they've been written, if it's necessary for some reason.  We also see that Spark includes some status files as well.  These aren't a major concern, as most Hadoop technologies know how to leverage or ignore these files as needed.

Seeing this data leads us to two important questions.  First, is the total data volume smaller if we store it in multiple files?
Storage Statistics
Fortunately, Azure Storage Explorer has some built-in functionality for this.  It turns out that we saw an 83% size reduction using the default partitioning and an 87% size reduction using the single partitioning.  More importantly, what kind of read performance do we get off of these files?
Read Statistics
This test showed that the small files took 9.37 seconds, the default partitioned files took 0.67 seconds and the single file took 0.54 seconds.  These numbers are even better than the Storage Statistics, boasting a 93% read time reduction for the default partitioning and a 94% read reduction for the single file.  What's even more interesting here is that there doesn't seem to be a substantial difference between using the default partitioning and using a single file.  So, we'll just stick with the defaults moving forward.

Now that we've compiled all of our small files together, we need to expose the data in a way that Power BI can efficiently read.  Power BI can read from Blob Storage, but it's messy and we won't try that.  Fortunately, Databricks has a built-in Spark SQL Database that we can write our data to.
createOrReplaceTempView
<CODE START>

%scala


df.createOrReplaceTempView("Customers")

<CODE END>

<CODE START>

%sql


SELECT * FROM Customers LIMIT 10

<CODE END>

Databricks allows us to leverage a Scala function "createOrReplaceTempView()" that automatically creates a temporary table for us.  Unlike in the previous post, this table is not connected directly to the data in blob storage.  Instead, it's a copy of the data that is stored directly inside the Databricks DBFS Storage.  Just like with HDInsight, it is possible to create an external table that connects to remote file storage.  In order to do this, we must first mount the storage container.
Mount Storage Account
<CODE START>

%scala

dbutils.fs.mount(
  source = "wasbs://interactivequery@datascienceinpowerbibig.blob.core.windows.net/datadefault",
  mountPoint = "/mnt/datadefault",

  extraConfigs = Map("fs.azure.account.key.datascienceinpowerbibig.blob.core.windows.net" -> <Storage Account Access Key>))

<CODE END>

This command allows us to access the data in the "datadefault" folder within the "interactivequery" container, simply by referencing the "/mnt/datadefault" location as if it was local.  We can use this to create our external table.  Understanding that it's not good practice to use your account keys in our code, it's also possible to store our Account Keys using the Secrets API and reference them using the "dbutils.secrets.get" function.

Unfortunately, while writing this, we discovered that there is a bug affecting the use of Avro for this operation.  If we use the Avro data, we end up with a strange table.
CustomersBlob (Avro)
<CODE START>

CREATE EXTERNAL TABLE CustomersBlob(

)

<CODE END>

Fortunately, saving the same data in Parquet format resolves the problem with no issue.
CustomersBlob (Parquet)
Now that this data is accessible through Databricks, we can access it from Power BI using the Spark connector.  First, we need to find the connection information.  In order to connect, we need to get the JDBC/ODBC connection information for the cluster, as well as an access token.  We won't show this process step-by-step, but you can find it here.
Customers
Hopefully, this post enlightened you to the possibilities of combining the processing power of Databricks Spark with Power BI.  Databricks is one of the coolest new kids on the block and provides an incredibly simple mechanism for implementing python, r, scala and sql code against the open-source Spark framework.  Stay tuned for the final post in this series where we'll dig into Spark MLlib.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com

Monday, March 11, 2019

Data Science in Power BI: HDInsight Hive

Today, we're going to talk about HDInsight Hive within Power BI.  If you haven't read the earlier posts in this series, IntroductionGetting Started with R ScriptsClusteringTime Series DecompositionForecastingCorrelationsCustom R VisualsR Scripts in Query EditorPythonAzure Machine Learning StudioStream Analytics and Stream Analytics with Azure Machine Learning Studio, they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, HDInsight Hive.

HDInsight is a managed Hadoop platform available on Azure.  On the back-end, HDInsight is enabled by a partnership between Hortonworks and Microsoft to bring the Hortonworks Data Platform (HDP) to Azure as a Platform-as-a-Service option.  This allow us to leverage much of the functionality of the Hadoop ecosystem, without having to manage and provision the individual components.  HDInsight comes in a number of configurations for different needs, such as Kafka for Real-Time Message Ingestion, Spark for Batch Processing and ML Services for Scalable Machine Learning.  Although most of the HDInsight configurations include Hive, we'll be using the Interactive Query configuration.  You can read more about HDInsight here.

Hive is a "SQL on Hadoop" technology that combines the scalable processing framework of the ecosystem with the coding simplicity of SQL.  Hive is very useful for performant batch processing on relational data, as it leverages all of the skills that most organizations already possess.  Hive LLAP (Low Latency Analytical Processing or Live Long and Process) is an extension of Hive that is designed to handle low latency queries over massive amounts of EXTERNAL data.  One of this coolest things about the Hadoop SQL ecosystem is that the technologies allow us to create SQL tables directly on top of structured and semi-structured data without having to import it into a proprietary format.  That's exactly what we're going to do in this post.  You can read more about Hive here and here and Hive LLAP here.

We understand that SQL queries don't typically constitute traditional data science functionality.  However, the Hadoop ecosystem has a number of unique and interesting data science features that we can explore.  Hive happens to be one of the best starting points on that journey.

Let's start by creating an HDInsight cluster in the Azure portal.
HDInsight Creation Basics
HDInsight Creation Storage
HDInsight Creation Cluster Size
When we provisioned the cluster, we connected it to an Azure Storage account named "datascienceinpowerbibig".  This is important because HDInsight clusters have two types of storage available, internal and external.  Internal storage is provisioned with the cluster and is deleted when it is deprovisioned.  External storage is provisioned separately and remains after the cluster is deprovisioned.  Additionally, the external storage is easily accessible by other Azure resources.  It's also important to note that we opened up the advanced options and reduced the cluster size as small as possible.  Even then, this cluster costs $2.75 per hour, which adds up to $2,046 per month.  This pricetag is why many organizations provision HDInsight clusters only when they need them, then deprovision them when they are done.

Now that our cluster is provisioned, let's take a look at the Blob store where we pointed our cluster.
HDInsight Storage
We see that the HDInsight cluster has created a number of directories that it uses to store its data and metadata.  In addition to the automatically created files, we added a "data" directory and added a bunch of data files to it.
HDInsight Data
It's important to note that these files are all very small.  Hive is known to perform better on a small number of large files than it does on a large number of small files.  So, the data we have here is not optimal for querying.  Perhaps this is a good topic for a later post (WINK, WINK).  It's also important to note that these files are stored in Avro format.  Avro is one of the recommended formats for storing data that is to be queried by Hive.  You can read more about Avro here and Hive Data Formats here.

Now that we've seen our raw data, let's navigate to the SQL Interface within HDInsight.
Ambari
Many of the administrative interfaces and dashboards can be found within Ambari, which can be accessed from the HDInsight resource blade in the Azure portal.
Hive View
In Ambari, we can access the "Hive View 2.0" by clicking the waffle in the top-right corner of the window.
Blank Query
The Hive View takes us to a blank query window.  This is where we could write our queries.  For now, we're more interested in creating a table.  We could do this with SQL commands, but the Hive View has a nice GUI for us to use as well.
New Table
We can access this GUI by clicking the "+New Table" button in the top-right corner of the window.
Columns
We start by defining all of the columns in our dataset.
Advanced
Then, we can go to the Advanced tab to supply the location of the data and the file formats.  Since our HDInsight cluster is directly connected to the Azure Storage Account, we don't have to worry about defining complex path names.  The cluster can access the data simply by using the relative path name of "/data/".

Now that we've created our table, we can go back to the Query editor and query it.
Customers
We also have the ability to see the code that was generated to create the table by navigating to the Tables tab.
DDL
Now that we've taken a look at our table, let's get to the Power BI portion.  One of the really cool things about Hive is that it exposes our data using the same SQL drivers that we are used to.  This means that most tools can easily access it, including Power BI.
Get Data
Power BI Report

Once we connect to the table, we can use the same Power BI functionality as always to create our data visualizations.  It's important to note that there are substantial performance implications of using "DirectQuery" mode to connect to Hive LLAP clusters.  Hive LLAP is optimized for this type of access, but it's not without complexity.  Many organizations that leverage this type of technology put a substantial amount of effort into keeping the database optimized for read performance.

Hopefully, this post opened your eyes a little to the power of leveraging Big Data within Power BI.  The potential of SQL and Big Data is immense and there are many untapped opportunities to leverage it in our organizations.  Stay tuned for the next post where we'll be looking into Databricks Spark.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com

Monday, February 18, 2019

Data Science in Power BI: Stream Analytics and Azure Machine Learning Studio

Today, we're going to talk about combining Stream Analytics with Azure Machine Learning Studio within Power BI.  If you haven't read the earlier posts in this series, Introduction, Getting Started with R Scripts, Clustering, Time Series Decomposition, Forecasting, Correlations, Custom R Visuals, R Scripts in Query Editor, Python, Azure Machine Learning Studio and Stream Analytics, they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, Stream Analytics and Azure Machine Learning Studio.

This post is going to build directly on what we created in the two previous posts, Azure Machine Learning Studio and Stream Analytics.  As such, we recommend that you read them before proceeding.

In our previous post on Azure Machine Learning Studio, we built a basic predictive model that predicts the income of new customers based on their demographic information.  This predictive model is currently exposed to us as a web service.  In the previous post on Stream Analytics, we built a basic data stream of new customers to show how the Power BI Service can be leveraged to visualize streaming data.  Obviously, the next step here is to combine these two features to create a data stream that predicts Income in real-time.  We start by opening up our existing Stream Analytics query in the Azure Portal.
Stream Analytics Query
We see that this query simply passes through the data with no additional transformations.  So, how do we leverage Azure Machine Learning Studio here?  There's one more element in the "Job Topology" pane that we haven't mentioned yet, Functions.
Create Azure ML Function
DataScienceInPowerBI Function
By navigating the to "Functions" pane and selecting "Add -> Azure ML", it's extremely easy to create an Azure ML function that connects to our existing web service.  It's also possible to create Javascript UDFs and UDAs, but those are topics for another post.
Function Details
Once we've created the function, we can open it again to see the details.  Specifically, this allows us to see exactly what fields are expected by the web service.  At this point, it's important to note that this web service requires us to pass in the "id" and "income" fields.  We left these in because they open up some interesting possibilities for analysis or visualizations that we may use at some point.  In production use cases, we recommend removing these.  Next, we need to alter our query to leverage this new function.
Updated Query
<CODE START>


WITH T AS (
SELECT *
,DataScienceInPowerBI(
1, [age], [workclass], [fnlwgt], [education], [education-num]
,[marital-status], [occupation], [relationship], [race], [sex]
,[capital-gain], [capital-loss], [hours-per-week], [native-country]
,''
) AS [PredictedIncomeRecord]
FROM [CustomersSASource]
)

SELECT *
,[PredictedIncomeRecord].[Scored Labels] AS PredictedIncome
,[PredictedIncomeRecord].[Scored Probabilities] AS ScoredProbability
INTO [CustomersStream]
FROM T

<CODE END>

As it turns out, the "DataScienceInPowerBI" function returns a value of type "record".  This means that we can't simply call the function and pass the results to Power BI.  So, we needed to make a CTE to capture the result in the "PredictedIncomeRecord" field.  Then, we pulled out the "Scored Labels" and "Scored Probabilities" fields in the final SQL statement.  It's important to note that we hard-coded 1 and '' (empty string) into the function call as our values for "id" and "income", as these fields do not exist in our data stream.  As we mentioned earlier, these fields should typically be removed in production scenarios.

As a side note, testing functions can be very difficult in Stream Analytics if you are using a Power BI sink output.  As we were developing this, we used a temporary blob storage container as our output sink, which allowed us to see the results of our queries in raw JSON format.  This is the recommended approach when developing more complex Stream Analytics queries.

Finally, let's clean up from our previous runs and start the stream with fresh data.  This will give us a dashboard that shows real-time streaming predictions.
Streaming Dashboard with Predictions
Hopefully, this post open your eyes a little to the possibilities of combining streaming data and machine learning within Power BI.  There's a tremendous amount of content here that we can't possibly cover in this blog.  Feel free to check it out on your own to see how it can help you solve some of the problems you're currently facing.  Stay tuned for the next post where we'll dig into the world of Big Data with HDInsight Hive.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com

Monday, January 28, 2019

Data Science in Power BI: Stream Analytics

Today, we're going to talk about Stream Analytics within Power BI.  If you haven't read the earlier posts in this series, Introduction, Getting Started with R Scripts, Clustering, Time Series Decomposition, Forecasting, Correlations, Custom R Visuals, R Scripts in Query Editor, Python and Azure Machine Learning Studio, they may provide some useful context.  You can find the files from this post in our GitHub Repository.  Let's move on to the core of this post, Stream Analytics.

Stream Analytics is an Azure service for processing streaming data from Event Hub, IoT Hub or Blob Storage using a familiar SQL-like language known as "Stream Analytics Query Language".  This language allows us to interpret JSON, Avro and CSV data in a way that will be very familiar to SQL developers.  It is also possible to aggregate data using varying types of time windows, as well as join data streams to each other or to other reference sets.  Finally, the resulting data stream can be output to a number different destinations, including Azure SQL Database, Azure Cosmos DB and Power BI.

We understand that streaming data isn't typically considered "Data Science" by itself.  However, it's are often associated and setting up this background now opens up some cool applications in later posts.  For this post, we'll cover how to sink streaming data to Power BI using Stream Analytics.

The previous posts in this series used Power BI Desktop for all of the showcases.  This post will be slightly different in that we will leverage the Power BI Service instead.  The Power BI Service is a collaborative web interface that has most of the same reporting capabilities as Power BI Desktop, but lacks the ability to model data at the time of writing.  However, we have heard whispers that data modeling capabilities may be coming to the service at some point.  The Power BI Service is also the standard method for sharing datasets, reports and dashboards across organizations.  For more information on the Power BI Service, read this.

In order to leverage Stream Analytics, we obviously need to start by creating a source of streaming data.  In order to do this, we'll leverage some sneaky functionality with Azure SQL Database and Azure Data Factory (ADF).  Basically, we want to trigger an ADF Pipeline every minute that pulls between 1 and 5 random records from our database, placing these records in Blob Storage.  We'll pretend that this is our "stream".  Stream Analytics will then pick up these records, process them and sink them to Power BI.  The flow should look like this:
Customer Stream
This is going to be an extension of the "Customers" tables we created in the previous post.  We'll start by creating a SQL Stored Procedure called "Customers_ADFSource" that pulls between 1 and 5 random records from the "Customers_New" table.

<CODE START>

CREATE PROCEDURE Customers_ADFSource AS
BEGIN

DECLARE @records INT = ( SELECT CEILING( RAND() * 4 ) + 1 )

BEGIN
WITH T AS (
SELECT
ROW_NUMBER() OVER ( ORDER BY NEWID() )AS [rownumber]
,NEWID() AS [customerid]
,[age]
,[workclass]
,[fnlwgt]
,[education]
,[education-num]
,[marital-status]
,[occupation]
,[relationship]
,[race]
,[sex]
,[capital-gain]
,[capital-loss]
,[hours-per-week]
,[native-country]
FROM Customers_New
)

SELECT
[customerid]
,[age]
,[workclass]
,[fnlwgt]
,[education]
,[education-num]
,[marital-status]
,[occupation]
,[relationship]
,[race]
,[sex]
,[capital-gain]
,[capital-loss]
,[hours-per-week]
,[native-country]
FROM T
WHERE [rownumber] < @records
END

END

<CODE END>

EDIT: After writing this, we later found that this could also be achieved by using something similar to "SELECT TOP (@records) * FROM [Customers_New]".  The parentheses are extremely important as this will not work without them.


Customers_ADFSource
Notice that we aren't bringing over the "Income" column.  This will become important in the new post.  We're also assigning new GUIDs to each record so that we can pretend that these are always new customers, even if they happen to have the exact same attributes as a previous customer.

Next, we want to create an ADF Pipeline that executes the "Customers_ADFSource" Stored Procedure and loads the data into the "customerssasource" Blob Container.  Showcasing this would take way too many screenshots for us to add here.  Feel free to take it on as an exercise for yourselves, as it's not terribly complicated once you have the SQL Stored Procedure created.
customerssasource
<OUTPUT START>

{"customerid":"5c83d080-d6ba-478f-b5f0-a511cf3dee48","age":56,"workclass":"Private","fnlwgt":205735,"education":"1st-4th","education-num":2,"marital-status":"Separated","occupation":"Machine-op-inspct","relationship":"Unmarried","race":"White","sex":"Male","capital-gain":0,"capital-loss":0,"hours-per-week":40,"native-country":"United-States"}

{"customerid":"6b68d988-ff75-4f63-b278-39d0cf066e96","age":25,"workclass":"Private","fnlwgt":194897,"education":"HS-grad","education-num":9,"marital-status":"Never-married","occupation":"Sales","relationship":"Own-child","race":"Amer-Indian-Eskimo","sex":"Male","capital-gain":6849,"capital-loss":0,"hours-per-week":40,"native-country":"United-States"}


<OUTPUT END>

We see that the ADF pipeline is placing one file containing between 1 and 5 customer records in the blob container every minute.  Now that we have emulated a stream of Customer data, let's use Stream Analytics to process this stream and sink it to Power BI.  We start by creating an empty Stream Analytics job.  This is so simple that we won't cover it in this post.
After it's created, we see that we have four different components in the "Job Topology" section of the Stream Analytics blade.  We'll start by creating an input that reads data from our Blob Container.
Create Blob Input
CustomersSASource (SA)


We have called this input "Customers_SASource".  Next, we want to create a stream output that sinks to Power BI.
Create Power BI Output
CustomersStream
Now that we have an input and an output, we need to create a query.  As we mentioned earlier, the Stream Analytics Query Language is a SQL-like language that makes it relatively painless to access Stream Analytics inputs that may be stored in semi-structured formats, such as CSV, JSON and AVRO.  In our case, we'll just use "SELECT *" to pass through all of the data.
Query
While writing this, we discovered that Stream Analytics will allow us to create input and outputs with underscores ( _ ) in the names, but the Query Editor considers them to syntax errors.  This is an odd compatibility issue that should be noted.

Now, the last step is to go to the Power BI Portal and navigate to the workspace where we stored our dataset.
My Workspace
By clicking on the "CustomersStream" dataset, we can create a simple report that shows us the total number of Customers that are streaming into our dataset.
CustomersStreamReport
While this functionality is pretty cool, the Power BI Service still has a long way to go in this respect.  Additional effort is needed to make the report always show a relative time frame, like the last 30 minutes.  Reports also don't seem to automatically refresh themselves in any way.  When we wanted to see the most recent data from the stream, we had to manually refresh this report.  This issue also occurs if we pin a live report to a dashboard.  However, we found that individually pinning the visuals to the dashboard allows them automatically update.
CustomersStreamDashboard
The left side of the dashboard shows a live report tile, while the right side of the dashboard shows a set of pinned visuals.  As for our "last 30 minutes" issue mentioned earlier, we can work around this by leveraging Custom Streaming Dashboard Tiles.  These tiles provide a very limited set of functionality for viewing live streaming data from a dashboard.  You can read more about them here.
Custom Streaming Dashboard Tiles
Hopefully, this post enlightened you to some of the ways that Power BI can be used to consume streaming data.  The use cases for streaming data are constantly evolving, as there seem to be new opportunities every day.  Stay tuned for the next post where we'll add some Azure Machine Learning Studio integration into our Stream Analytics job.  Thanks for reading.  We hope you found this informative.

Brad Llewellyn
Service Engineer - FastTrack for Azure
Microsoft
@BreakingBI
www.linkedin.com/in/bradllewellyn
llewellyn.wb@gmail.com