Nick Dalalelis

Cloud Software Engineer

Delta Lake - ACID Transactions

2021-03-03 Nick DalalelisSpark

I’m very excited to write this blog post on how Delta Lake brings ACID transactions to Apache Spark. Apache Spark is a data processing framework that can quickly perform analytical workloads on very large data sets in your data lake. Data lakes typically have multiple data pipelines reading and writing data concurrently. Data engineers go through a tedious process to ensure data integrity, due to the lack of transactions with Apache Spark. Let’s explore how to achieve ACID transactions with the help of Delta Lake.

Set up Apache Spark with Delta Lake

In order to follow along, you will first need to download Spark 3.1.1 from here. All of our examples will use the Spark Scala Shell.

Extract the Spark archive

tar -xzvf spark-3.1.1-bin-hadoop3.2.tgz

Set SPARK_HOME variable. Below is an example where Spark 3.1.1 was unpacked in /Users/nickdala/stuff

export SPARK_HOME="/Users/nickdala/stuff/spark-3.1.1-bin-hadoop3.2"

Add Spark bin directory to PATH variables

export PATH=$SPARK_HOME/bin:$PATH

Verify that the prerequisite steps have been executed correctly

which spark-submit

You should see spark-submit located in your SPARK_HOME directory

/Users/nickdala/stuff/spark-3.1.1-bin-hadoop3.2/bin/spark-submit

Spark Shell

Spark provides an interactive shell, allowing us to easily prototype certain operations quickly. All of the examples will be coded in Scala. Let’s begin by executing the following command to start spark-shell with Delta Lake.

spark-shell --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

You should see the following.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.1
      /_/
         
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 11.0.7)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

ACID Properties

In this blog, we will cover the ACID properties atomicity and consistency. Isolation and durability will be covered in a follow up blog.

Atomicity guarantees that each transaction is treated as a single “unit”, which either succeeds completely or fails completely. Consistency ensures that the data is always in a valid state.

Spark Save operations

The Spark DataFrame save operation can optionally take a SaveMode parameter that specifies how to handle existing data if present. It’s important to realize that these save modes do not utilize any locking and are not atomic. This means that a failed job might leave an incomplete file.

The documentation goes on further to state, "Additionally, when performing an Overwrite, the data will be deleted before writing out the new data." In other words, a failed job may remove old data and leave corrupt data.

spark-save-mode

In reality, the Spark DataFrame writer API internally provides some degree of atomicity. This is illustrated below using the append save operation. We will also see how Spark fails to provide atomicity or consistency during an overwrite save operation.

Atomicity and Consistency for Append Operation

Let’s start by writing a spark job that creates 100 records and saves them to a file. Using the spark-shell that we launched earlier, copy the following code.

scala> spark.range(100)
            .repartition(1)
            .write
            .mode("overwrite")
            .csv("out/test-1")

Verify that the file ‘test-1’ was created and that it contains 100 records.

ls -l out/test-1
total 8
-rw-r--r--  1 -  -    0 Apr  5 23:33 _SUCCESS
-rw-r--r--  1 -  -  290 Apr  5 23:33 part-00000-9576ded5-bb45-4a7d-859d-58d187fe607c-c000.csv
scala> spark.read.csv("out/test-1").show()
+---+
|_c0|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
only showing top 20 rows


scala> spark.read.csv("out/test-1").count()
res10: Long = 100

Now let’s write a job that appends 100 records to the file that fails in the middle.

scala> scala.util.Try(spark.range(100).repartition(1).map{ i =>
     if (i>50) {
       Thread.sleep(5000)
       throw new RuntimeException("Something went wrong")
     }
     i
    }.write.mode("append").csv("out/test-1"))

You should see a stack trace similar to the following.

21/04/05 23:39:22 ERROR Utils: Aborting task                        (0 + 1) / 1]
java.lang.RuntimeException: Something went wrong
	at $line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res2$2(<console>:27)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.mapelements_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.deserializetoobject_doConsume_0$(Unknown Source)

Atomicity and consistencyis actually accomplished when appending records. The previous file still contains the original 100 records.

ls -l out/test-1
total 8
-rw-r--r--  1 -  -    0 Apr  5 23:33 _SUCCESS
-rw-r--r--  1 -  -  290 Apr  5 23:33 part-00000-9576ded5-bb45-4a7d-859d-58d187fe607c-c000.csv
scala> spark.read.csv("out/test-1").count()
res10: Long = 100

Atomicity comes with performance hit. To learn more, read the Databricks writeup here.

Atomicity and Consistency for Overwrite Operation

The write operation with an overwrite save mode deletes the old file and creates a new one. Let’s look at the following example.

scala> spark.range(100)
            .repartition(1)
            .write
            .mode("overwrite")
            .csv("out/test-2")
ls -l out/test-2 
total 8
-rw-r--r--  1 -  -    0 Apr  6 19:55 _SUCCESS
-rw-r--r--  1 -  -  290 Apr  6 19:55 part-00000-c425c5a1-d39f-4012-aa12-92c057ed00ff-c000.csv

Now lets execute a failing job that overwrites the existing file.

scala> scala.util.Try(spark.range(100).repartition(1).map{ i =>
     if (i>50) {
       Thread.sleep(5000)
       throw new RuntimeException("Something went wrong")
     }
     i
    }.write.mode("overwrite").csv("out/test-2"))

Notice that both files were deleted. This means that Spark does not guarantee atomicity or consistency for overwrite save operations.

ls -l out/test-2

The problem is even more subtle. What happens if the second job completes? Do we still have a consistency problem? The answer is yes. That’s because there’s a point in time when you have no data; between the time the data is deleted and the new data is not written out.

Atomicity and Consistency using Delta Lake

Let’s try running the above example using delta.

scala> spark.range(100)
            .repartition(1)
            .write
            .mode("overwrite")
            .format("delta")
            .save("out/test-3")

Notice now that you have a parquet file with a delta transaction log.

ls -l out/test-3
total 8
drwxr-xr-x  3 -  -    96 Apr 15 23:33 _delta_log
-rw-r--r--  1 -  -  1151 Apr 15 23:33 part-00000-7173f5e1-dce7-429d-b3a9-cbb34e9af662-c000.snappy.parquet

Now lets execute a failing job that overwrites the existing file.

scala> scala.util.Try(spark.range(100).repartition(1).map{ i =>
        if (i>50) {
            Thread.sleep(5000)
            throw new RuntimeException("Something went wrong")
        }
        i
       }.write.mode("overwrite")
        .format("delta")
        .option("overwriteSchema", "true")
        .save("out/test-3"))

Notice how the failed job did not delete the previous 100 rows.

scala> spark.read.format("delta").load("out/test-3").count()
res29: Long = 100

Summary

Delta Lake brings ACID transactions to your data lakes. We have demonstrated that with ACID transactions, data is not lost if a job fails during the “overwrite” operation.