Computing Platform (4): ETL Processes with Spark and Databricks

Hi all,

We'll try to reflect in this post a summary of the main steps to follow when we want to create an ETL process in our Computing Platform.

What is ETL?


It stands for Extraction Transformation Load. It is a term commonly used for operational processes that run at out of business time to transform data into a different format, generally ready to be consumed by other applications like Business Intelligence, reporting apps, dashboards, visualizations, etc.



The ETL concept is well known and it is out of the scope of the post. However, we found several aspects to remark:

  1. The Extraction phase can use one or several data sources in different formats and storage types like file systems, databases, data lakes, etc. The format of files or data can vary a lot: JSON, CSV, AVRO, etc.

  2. The Transformation phase varies according to the chosen destination storage, the intended formats and the expected data.  The generation of statistics, aggregations, inclusion of relationships etc. are usually involved in this phase

  3. The Load phase refers normally to a single data storage destination though it can be multiple. The type of storage is normally set to be easily queried and accessed by the client apps (which we could call as downstream consumers). Normally the target consists of Data Warehouse, Databases (relational, NoSQL), etc.


Why Spark for ETL Processes?


Spark offers parallelized programming out of the box. It is ideal for ETL processes as they are similar to Big Data processing, handling huge amounts of data.

Parallelization is a great advantage the Spark API offers to programmers.
Actually, as a programmer you should use the Spark API (using Java, Scala, Python or R) as much as you can to take advantage of the clustered architecture of Spark and the parallelization features. By using the Spark API you'll give a boost to the performance of your applications.

val rddData = sc.parallelize(eventsAsJSONList)
val resultDF = spark
.read
.option("multiLine", true)
.json(rddData)

In above example a collection (a Scala Sequence in this case and always a distributed dataset) will be managed in a parallel way by default.

Parallelization with no extra effort is an important factor but Spark offers much more. An amazing API that makes Spark the main framework in our stack and capabilities, from basic parallel programming to graphs, machine learning, etc.

 

Why Databricks


Well, we use Azure Databricks as our main platform for Big Data and parallel processes. Databricks jobs does really fit to ETL as they can be scheduled to run in a given frequency as a periodic batch job. We'd like first to summarize the pros and cons I've found with this approach (batch job) for ETL:

Pros:

  1. We do not need a 24x7 running cluster. It is important when our resources are limited.

  2. You can re-use a production cluster using it at out-of-business time, for instance.


Cons:

  1. Latency. With this approach you have to wait until the job has been executed to have the most recent results.

  2. Because of point 1, not real-time information is available.


I know, batch job is the old way. Anyway, it depends whether you really want to give the process a specific frequency or you need a continuous transformation because you cannot wait hours to feed your downstream consumers. There are options based on streaming (e.g. Real-time Streaming ETL with Structured Streaming). In our case the Real-time Streaming approach was not the most appropriate option as we had not real-time requirements. Anyway, we'll talk about Real-time ETL in a next post as an evolution of the described process here.

 

Structure of a Spark ETL Process for Databricks


This section includes the definition of a Spark Driver Application containing a scheduled ETL process, how the project is arranged, what tests have been considered and what is the applied SDLC for Delivery considering it has to be attached to a Databricks Job.
The JAR file based Spark application is not better or worst than Databricks notebooks or Python apps. It is just another approach. Pros and Cons are different and we should adapt to each different case.

The type of Spark Application can be a JAR file (Java/Scala), a Notebook or a Python application. I've chosen this time the JAR file. Why? Well, the notebook is clearly attached to Databricks. In this case and given the importance of the process I wanted to be flexible and consider the chance to use a different Spark cluster if needed, for instance by submitting the JAR app to a Spark cluster not managed by Databricks if needed. In this case the JAR file approach will require some small change to work. It is not the case of notebooks that require the Databricks run-time.

The ETL Plan


Well, first of all we have to design the ETL plan. That is basically what will be the sequence of actions to carry out, where and how. In our use case is simple, just some handling of an event store in an event Sourcing system to make data from events consumable from visual and analytics tools.


Spark in Databricks


Anyway the default option is to use a Databricks job to manage our JAR app. So, several important points here to highlight previously:

SparkContext and SparkSession


Consider that the app will run in a Databricks Spark cluster. So, there are some rules to follow when creating the SparkSession and SparkContext objects.

First of all, declare the Spark dependencies as Provided:
val sparkVersion = "2.4.0"

libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % Provided
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % Provided
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % Provided
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % Provided
libraryDependencies += "org.apache.spark" %% "spark-catalyst" % sparkVersion % Provided

Secondly, because Databricks is a managed service, some code changes may be necessary to ensure that the Spark job runs correctly. A JAR-based job must use the shared SparkContext API to get the object. Because Databricks initializes the SparkContext, programs that invoke a new context will fail. To get the SparkContext, use only the shared SparkContext  provided by Databricks:
import org.apache.spark.sql.SparkSession

trait SparkSessionWrapper {

lazy val spark: SparkSession = {
SparkSession
.builder()
.appName("etl event store")
.config("spark.sql.files.ignoreCorruptFiles", true)
.getOrCreate()
}
}




There are some pieces of advice we should follow when using the shared Databricks SparkContext if we do not want to see our job failing:

First, do not manually create a SparkContext object using the constructor:

import org.apache.spark.SparkConf 

val sc = new SparkContext(new SparkConf().setAppName("AppJob").setMaster("local"))


Secondly, do not stop the SparkContext in the JAR application:





val sc = SparkContext.getOrCreate()
sc.stop()



Finally, do not call System.exit(0) or sc.stop() at the end of your Main method in the application. This can cause undefined behavior.



Spark Cache


Spark offers native cache in memory in it API.  However, it is important to know how caching works in Spark . For instance, the Databricks IO cache supports reading Parquet files from DBFS, Amazon S3, HDFS, Azure Blob Storage, and Azure Data Lake. It does not support other storage formats such as CSV, JSON, and ORC. Read this resource for more information about cache with Databricks.

Databricks File System


Regarding the Databricks File System it cannot be used from a JAR application as it is available only for Notebooks for now. The official answer is:

Unfortunately, not yet. We do not have a way to link a jar against the dbutils library yet. However, DBFS just ultimately reads/writes data either from S3 or file system on the Spark cluster. So in your SBT project, you'll need to just directly use the S3 library API or the local file system libraries.

Which is actually a shame. We talked in a post of this Techblog about how to correlate the directories in an Azure Data Lake to a mount point in DBFS. Unfortunately, this approach will be valid only for Databricks Notebooks.

 

ETL Process Definition: Pipelines


What are Spark pipelines? They are basically sequences of transformation on data using immutable, resilient data-sets (RDDs) in different formats. The source data in pipelines covers  structured or not-structured types like JDBC, JSON, Parquet, ORC, etc. (For instance, Azure Data Lake storing Avro files with JSON content) while the output is normally integrated, structured and curated, ready for further processing, analysis, aggregation and reporting. Just an example:
val rddJSON = spark
.read
.option("multiLine", true)
.json(rddJSONContent) // EXTRACTION
.map(eb = transformEvent(eb)) // TRANSFORMATION
.write
.mode(SaveMode.Append)
.cosmosDB(loadConfigMap) // LOADING

Where the constant  rddJSONContent is an RDD extracted form JSON content.

Pipelines are a recommendable way of processing data in Spark in the same way, for instance, than Machine/Deep Learning pipelines.

Project Layout


The structure of the project for a JAR-based Spark app is the regular one used with Scala/SBT projects.



Some remarkable features in this layout are:

  • SparkSession wrapper (please remember what was said in the Spark in Databricks section above)


import org.apache.spark.sql.SparkSession

trait SparkSessionWrapper {

lazy val spark: SparkSession = {
SparkSession
.builder()
.appName("YOUR APP NAME HERE")
.config("spark.sql.files.ignoreCorruptFiles", true)
.getOrCreate()
}
}


  • Spark Test components


libraryDependencies += "com.github.mrpowers" % "spark-fast-tests_2.11" % "2.3.0_0.11.0" % Test


  • SCA (Static Code Analysis) descriptor file (sonar-project.properties)


# Project details
sonar.projectKey=${YOUR PROJECT KEY HERE}
sonar.projectName=${YOUR PROJECT NAME HERE}

# Sonar will start looking for files in this directory
sonar.language=scala
sonar.sources=src
sonar.exclusions=${YOUR EXCLUDED PATHS HERE}
sonar.java.source=8
sonar.java.binaries=target/scala-2.11/classes
# Url to sonar server
sonar.host.url=${YOUR URL HERE}
sonar.scoverage.reportPath=target/scala-2.11/scoverage-report/scoverage.xml

#api-endpoint.sonar.sources=src
sonar.projectVersion=1.0.0-SNAPSHOT


  • Coverage plugins (plugins.sbt)


addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")


Testing


Dependencies

Really simple, just scalatest and spark fast tests.
libraryDependencies += "com.github.mrpowers" % "spark-fast-tests_2.11" % "2.3.0_0.11.0" % Test
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.1" % Test

Build Time Tests

We call build-time tests to the types of tests that are executed during the build/packaging process:

  • Unit Test

  • Integration Test

  • In-Container Test


Only Unit and Integration tests are applicable here given we do not use any application server or servlet container as our run-time.

After Deployment Tests

We understand after-deployment tests as the types of tests that are executed in a specific stage (Beta, Candidate) when the component has been already built and deployed. For instance

  • Functional Test

  • Load Test

  • Stress Test

  • Penetration Test


Only Functional and Load tests (based on the amount of source data) are applicable in the ETL case.

Coverage report

The coverage plugin for SBT allows us to easily generate the coverage report for build-time tests. The coverage report can be found as a HTML file in the target directory:
$ sbt clean coverage test $ sbt coverageReport



SparkSession Test Wrapper

Use a specific SparkSession wrapper for test purposes:
import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

lazy val spark: SparkSession = {
SparkSession
.builder()
.master("local")
.appName("spark test")
.getOrCreate()
}

}

Isolation of Tests?

For Azure managed services we use some mocks and test services for integration. The policies for testing against Cloud IT are usually flexible and probably the best approach is to find a trade-off between isolation and real integration.


The Databricks Job


We have to consider how the Spark application will be packaged, tested, deployed and tested again while we keep the version number increasing, submit to a SCA server for Quality monitoring and so on. We have also to provide the Delivery pipeline what is the role of the Spark app and how it should be handled and deployed.

To meet all these requirements we use the description of the target job for the Continuous Delivery Pipeline.  It is contained in a specific file, jobDescriptor.conf:
job{
name = "{YOUR JOB NAME HERE}"
scope = "OPERATIONAL_CYCLIC" // BUSINESS_LOGIC | OPERATIONAL_CYCLIC | EVENTUAL
execution{
dependencies = []
mainClass = "com.fts.cp.etl.events.EventsETL"
continuous = false
eventual = false
schedule{
enabled = true
timezone = "Europe/London"
frequency = "0 0 3 * * ?"
}
clusterType = "ETL" // LIVE | ETL
priority = "MEDIUM" // HIGH | MEDIUM | LOW
}
}

It is really simple and the properties are clear.

  • Name: Denomination of the Databricks job attached to the Spark app.

  • Scope: This is the working area of the app. If it is related to some business logic, it is part of the platform (cross-tenant) or it is dependent on another process.

  • Execution: These properties include information about the type of execution (continuous, scheduled, eventual) and related data like the frequency (a Cron expression). The clusterType will tell the pipeline what cluster to attach to the job while the priority will apply the policies for busy clusters.The dependencies section will include the Maven coordinates for the dependencies the app need to find (and tagged as Provided) in the Databricks Spark cluster.


The purpose of this file is to tell the Delivery Platform pipeline to take care for the existence of the Databricks job, to be updated according to the information in the descriptor file.

 

Some Tips (Azure specific)


Some transitive dependencies can collide when using Azure SDK libs of client libs. A couple of examples:

1-Issues with Jackson Core. Include this code for the Azure dependencies in the build.sbt file.
libraryDependencies += "com.microsoft.azure" % "azure-data-lake-store-sdk" % "2.3.1" exclude("com.fasterxml.jackson.core", "jackson-core")
libraryDependencies += "com.microsoft.azure" % "azure-cosmosdb-spark_2.3.0_2.11" % "1.2.2" exclude("com.fasterxml.jackson.core", "jackson-core")

2-Possible issues with Guava. In this case you can override the version to use with your Spark version:
dependencyOverrides += "com.google.guava" % "guava" % "15.0"

A prototype for ETL


Please find it here.

Conclusions



  • Spark offers an excellent platform for ETL.

  • Databricks is flexible enough regarding Spark Apps and formats although we have to keep in mind some important rules. Despite of this, some constraints are applied to JAR-based Spark apps, like the availability to the DBFS.

  • Spark transformation pipelines are probably the best approach for ETL processes although it depends on the complexity of the Transformation phase.

  • Real-time Streaming of batch jobs are still the main approaches when we design an ETL process. Which is the best depends on our requirements and resources.

  • Tests are an essential part of all apps and Spark apps are not an exception. Get the highest as possible test coverage and include all types of tests (build-time and after-deployment).

  • Keep in mind the SDLC process for your Spark apps. It is really important to achieve Continuous Delivery with these components taking advantage of their small size and flexibility in the Databricks universe, from the packaging and test until the final deployment as the attachment of a Databricks job.

  • Azure SDK and client libraries have to improve a lot to be used more seamlessly. They still give us too many issues.

Comments