Delta Lake - ACID Transactions
I’m very excited to write this blog post on how Delta Lake brings ACID transactions to Apache Spark. Apache Spark is a data processing framework that can quickly perform analytical workloads on very large data sets in your data lake. Data lakes typically have multiple data pipelines reading and writing data concurrently. Data engineers go through a tedious process to ensure data integrity, due to the lack of transactions with Apache Spark. Let’s explore how to achieve ACID transactions with the help of Delta Lake.
Set up Apache Spark with Delta Lake
In order to follow along, you will first need to download Spark 3.1.1 from here. All of our examples will use the Spark Scala Shell.
Extract the Spark archive
tar -xzvf spark-3.1.1-bin-hadoop3.2.tgz
Set SPARK_HOME variable. Below is an example where Spark 3.1.1 was unpacked in /Users/nickdala/stuff
export SPARK_HOME="/Users/nickdala/stuff/spark-3.1.1-bin-hadoop3.2"
Add Spark bin directory to PATH variables
export PATH=$SPARK_HOME/bin:$PATH
Verify that the prerequisite steps have been executed correctly
which spark-submit
You should see spark-submit located in your SPARK_HOME directory
/Users/nickdala/stuff/spark-3.1.1-bin-hadoop3.2/bin/spark-submit
Spark Shell
Spark provides an interactive shell
, allowing us to easily prototype certain operations quickly. All of the examples will be coded in Scala. Let’s begin by executing the following command to start spark-shell with Delta Lake.
spark-shell --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
You should see the following.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.1
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 11.0.7)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
ACID Properties
In this blog, we will cover the ACID properties atomicity and consistency. Isolation and durability will be covered in a follow up blog.
Atomicity guarantees that each transaction is treated as a single “unit”, which either succeeds completely or fails completely. Consistency ensures that the data is always in a valid state.
Spark Save operations
The Spark DataFrame
save operation can optionally take a SaveMode parameter that specifies how to handle existing data if present. It’s important to realize that these save modes do not utilize any locking and are not atomic. This means that a failed job might leave an incomplete file.
The documentation goes on further to state, "Additionally, when performing an Overwrite, the data will be deleted before writing out the new data."
In other words, a failed job may remove old data and leave corrupt data.
In reality, the Spark DataFrame writer API internally provides some degree of atomicity. This is illustrated below using the append
save operation. We will also see how Spark fails to provide atomicity or consistency during an overwrite
save operation.
Atomicity and Consistency for Append Operation
Let’s start by writing a spark job that creates 100 records and saves them to a file. Using the spark-shell
that we launched earlier, copy the following code.
scala> spark.range(100)
.repartition(1)
.write
.mode("overwrite")
.csv("out/test-1")
Verify that the file ‘test-1’ was created and that it contains 100 records.
➜ ls -l out/test-1
total 8
-rw-r--r-- 1 - - 0 Apr 5 23:33 _SUCCESS
-rw-r--r-- 1 - - 290 Apr 5 23:33 part-00000-9576ded5-bb45-4a7d-859d-58d187fe607c-c000.csv
scala> spark.read.csv("out/test-1").show()
+---+
|_c0|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
only showing top 20 rows
scala> spark.read.csv("out/test-1").count()
res10: Long = 100
Now let’s write a job that appends 100 records to the file that fails in the middle.
scala> scala.util.Try(spark.range(100).repartition(1).map{ i =>
if (i>50) {
Thread.sleep(5000)
throw new RuntimeException("Something went wrong")
}
i
}.write.mode("append").csv("out/test-1"))
You should see a stack trace similar to the following.
21/04/05 23:39:22 ERROR Utils: Aborting task (0 + 1) / 1]
java.lang.RuntimeException: Something went wrong
at $line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res2$2(<console>:27)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.mapelements_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.deserializetoobject_doConsume_0$(Unknown Source)
Atomicity and consistency
is actually accomplished when appending
records. The previous file still contains the original 100 records.
➜ ls -l out/test-1
total 8
-rw-r--r-- 1 - - 0 Apr 5 23:33 _SUCCESS
-rw-r--r-- 1 - - 290 Apr 5 23:33 part-00000-9576ded5-bb45-4a7d-859d-58d187fe607c-c000.csv
scala> spark.read.csv("out/test-1").count()
res10: Long = 100
Atomicity comes with performance hit. To learn more, read the Databricks writeup here.
Atomicity and Consistency for Overwrite Operation
The write operation with an overwrite save mode deletes the old file and creates a new one. Let’s look at the following example.
scala> spark.range(100)
.repartition(1)
.write
.mode("overwrite")
.csv("out/test-2")
➜ ls -l out/test-2
total 8
-rw-r--r-- 1 - - 0 Apr 6 19:55 _SUCCESS
-rw-r--r-- 1 - - 290 Apr 6 19:55 part-00000-c425c5a1-d39f-4012-aa12-92c057ed00ff-c000.csv
Now lets execute a failing job that overwrites the existing file.
scala> scala.util.Try(spark.range(100).repartition(1).map{ i =>
if (i>50) {
Thread.sleep(5000)
throw new RuntimeException("Something went wrong")
}
i
}.write.mode("overwrite").csv("out/test-2"))
Notice that both files were deleted. This means that Spark does not guarantee atomicity or consistency for overwrite
save operations.
ls -l out/test-2
The problem is even more subtle. What happens if the second job completes? Do we still have a consistency problem?
The answer is yes. That’s because there’s a point in time when you have no data; between the time the data is deleted and the new data is not written out.
Atomicity and Consistency using Delta Lake
Let’s try running the above example using delta.
scala> spark.range(100)
.repartition(1)
.write
.mode("overwrite")
.format("delta")
.save("out/test-3")
Notice now that you have a parquet file with a delta transaction log.
ls -l out/test-3
total 8
drwxr-xr-x 3 - - 96 Apr 15 23:33 _delta_log
-rw-r--r-- 1 - - 1151 Apr 15 23:33 part-00000-7173f5e1-dce7-429d-b3a9-cbb34e9af662-c000.snappy.parquet
Now lets execute a failing job that overwrites the existing file.
scala> scala.util.Try(spark.range(100).repartition(1).map{ i =>
if (i>50) {
Thread.sleep(5000)
throw new RuntimeException("Something went wrong")
}
i
}.write.mode("overwrite")
.format("delta")
.option("overwriteSchema", "true")
.save("out/test-3"))
Notice how the failed job did not delete the previous 100 rows.
scala> spark.read.format("delta").load("out/test-3").count()
res29: Long = 100
Summary
Delta Lake brings ACID transactions to your data lakes. We have demonstrated that with ACID transactions, data is not lost if a job fails during the “overwrite” operation.