The most famous checkpoint is very certainly Checkpoint Charlie. As you can see, even back in 1963, the Soviets were already teasing the United States with hype technology.
The most famous checkpoint is very certainly Checkpoint Charlie. As you can see, even back in 1963, the Soviets were already teasing the United States with hype technology. Source Wikipedia.

Let’s understand what can checkpoints do for your Spark dataframes and go through a Java example on how we can use them.

Checkpoint on Dataframe

In v2.1.0, Apache Spark introduced checkpoints on dataframe/dataset – I will continue to use the term of dataframe for a Dataset<Row>. The Javadoc describes it as:

Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the logical plan of this Dataset, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext#setCheckpointDir.

However, I think it requires a little more explanation.

What do I Want a Checkpoint?

Basically, I will use a checkpoint if I want to freeze the content of my dataframe before I do something else. It can be in the scenario of iterative algorithms as mentioned in the Javadoc, but also in recursive algorithms or simply branching out a dataframe to run different kind of analytics on both.

Spark has been offering checkpoints on streaming since earlier version (at least v1.2.0), but checkpoint on a dataframe is a different beast.

Types of Checkpoints

You can create 2 kinds of checkpoints.

An eager checkpoint will cut the lineage from previous dataframes and will allow to start “fresh” from this point on. In clear, Spark will dump your dataframe in a file specified by setCheckpointDir() and will start a fresh new dataframe from it. You will also need to wait for completion of the operation.

On the opposite, a non-eager checkpoint will keep the lineage from previous operations in the dataframe.

And the Code is…

Now that we understand what a checkpoint is and how it works, let’s see how we implement that in Java. The code is part of my Apache Spark Java Cookbook on GitHub.

public class DataframeCheckpoint {
	public static void main(String[] args) {
		DataframeCheckpoint app = new DataframeCheckpoint();
		app.start();
	}

	private void start() {
		SparkConf conf = new SparkConf().setAppName("Checkpoint").setMaster("local[*]");
		SparkContext sparkContext = new SparkContext(conf);
		// We need to specify where Spark will save the checkpoint file. It can be an HDFS location.
		sparkContext.setCheckpointDir("/tmp");
		SparkSession spark = SparkSession.builder().appName("Checkpoint").master("local[*]").getOrCreate();

		String filename = "data/tuple-data-file.csv";
		Dataset&amp;amp;lt;Row&amp;amp;gt; df1 = spark.read().format("csv").option("inferSchema", "true").option("header", "false")
				.load(filename);
		System.out.println("DF #1 - step #1: simple dump of the dataframe");
		df1.show();

		System.out.println("DF #2 - step #2: same as DF #1 - step #1");
		Dataset&amp;amp;lt;Row&amp;amp;gt; df2 = df1.checkpoint(false);
		df2.show();

		df1 = df1.withColumn("x", df1.col("_c0"));
		System.out.println("DF #1 - step #2: new column x, which is the same as _c0");
		df1.show();

		System.out.println("DF #2 - step #2: no operation was done on df2");
		df2.show();
	}
}

The execution will be, without much surprise:

DF #1 - step #1: simple dump of the dataframe
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

DF #2 - step #2: same as DF #1 - step #1
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

DF #1 - step #2: new column x, which is the same as _c0
+---+---+---+
|_c0|_c1|  x|
+---+---+---+
|  1|  5|  1|
|  2| 13|  2|
|  3| 27|  3|
|  4| 39|  4|
|  5| 41|  5|
|  6| 55|  6|
+---+---+---+

DF #2 - step #2: no operation was done on df2
+---+---+
|_c0|_c1|
+---+---+
|  1|  5|
|  2| 13|
|  3| 27|
|  4| 39|
|  5| 41|
|  6| 55|
+---+---+

Although this example is really basic, it explains how to use checkpoint on a dataframe and see the evolution after the dataframe. Hopefully this will be useful to you too. A comment is always appreciated…

Thanks to Burak Yavuz at Databricks for his additional explanations.

Updates

2020-05-11 – There might be a few inaccuracies in this article. I highly recommend having a look at chapter 16 of Spark in Action, 2nd edition.

Comments are closed.