UDF are not UFO! | Photograph of an alleged UFO in New Jersey, taken on July 31, 1952 | Wikipedia
UDF are not UFO! | Photograph of an alleged UFO in New Jersey, taken on July 31, 1952 | Wikipedia

UDF stands for User Defined Functions. With those, you can easily extend Apache Spark with your own routines and business logic.

Let’s see how we can build them and deploy them.

There are 2 operations in your code:

  1. Registration – basically, you tell SparkΒ that you are going to use this UDF.
  2. Use – when you want to use it with your DataFrame.

They are 2 ways of defining a UDF. You can embed it directly in your code when you register it or have it in a separate class.

Pro of having the UDF internally:

  • Easily deployed.

Cons:

  • The top-level class needs to be serializable and it may not always be possible.
  • The readability is harder.

Pros of having the UDF externally:

  • Higher readability.
  • Class can be more complex – remember the serialization constraint.

Con:

  • Need to deploy the package separately and tell Spark where to find it.

Registration

The whole codeΒ can be found in the Apache Spark Java Recipes.

Internal

		sqlContext.udf().register("x2Multiplier", new UDF1<Integer, Integer>() {
			private static final long serialVersionUID = -5372447039252716846L;

			@Override
			public Integer call(Integer x) {
				return x * 2;
			}
		}, DataTypes.IntegerType);

External

When you deal with external UDF, you will have 2 classes/files.

import net.jgp.labs.spark.udf.Multiplier2;
		sqlContext.udf().register("x2Multiplier", new Multiplier2(), DataTypes.IntegerType);
package net.jgp.labs.spark.udf;

import org.apache.spark.sql.api.java.UDF1;

public class Multiplier2 implements UDF1<Integer, Integer> {

	private static final long serialVersionUID = -4519338105113996424L;

	@Override
	public Integer call(Integer t1) throws Exception {
		return t1 * 2;
	}

}

You can download the code here from GitHub: https://github.com/jgperrin/net.jgp.labs.spark.git.

Usage

Usage is the same in both cases:

		df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType)));

Deployment

We need to ensure that Spark knows about the library, therefore I updated both the configuration file and the environment file. You may not need to modify both of them,

SPARK_CLASSPATH=/home/jgp/.m2/repository/io/oplo/commons/spark/0.0.1/spark-0.0.1.jar

spark.driver.extraJavaOptions     /home/jgp/.m2/repository/io/oplo/commons/spark/0.0.1/spark-0.0.1.jar
spark.executor.extraJavaOptions   /home/jgp/.m2/repository/io/oplo/commons/spark/0.0.1/spark-0.0.1.jar

Conclusion

I recommendΒ early tests using internal classes. When your code matures, isolate them in a separate project/JAR, which will then have its own lifecycle and, eventually, a different team/engineer/data scientist to maintain it.

Comments are closed.