UDF stands for User Defined Functions. With those, you can easily extend Apache Spark with your own routines and business logic.
Let’s see how we can build them and deploy them.
There are 2 operations in your code:
- Registration – basically, you tell Spark that you are going to use this UDF.
- Use – when you want to use it with your DataFrame.
They are 2 ways of defining a UDF. You can embed it directly in your code when you register it or have it in a separate class.
Pro of having the UDF internally:
- Easily deployed.
Cons:
- The top-level class needs to be serializable and it may not always be possible.
- The readability is harder.
Pros of having the UDF externally:
- Higher readability.
- Class can be more complex – remember the serialization constraint.
Con:
- Need to deploy the package separately and tell Spark where to find it.
Registration
The whole code can be found in the Apache Spark Java Recipes.
Internal
sqlContext.udf().register("x2Multiplier", new UDF1<Integer, Integer>() { private static final long serialVersionUID = -5372447039252716846L; @Override public Integer call(Integer x) { return x * 2; } }, DataTypes.IntegerType);
External
When you deal with external UDF, you will have 2 classes/files.
import net.jgp.labs.spark.udf.Multiplier2;
sqlContext.udf().register("x2Multiplier", new Multiplier2(), DataTypes.IntegerType);
package net.jgp.labs.spark.udf; import org.apache.spark.sql.api.java.UDF1; public class Multiplier2 implements UDF1<Integer, Integer> { private static final long serialVersionUID = -4519338105113996424L; @Override public Integer call(Integer t1) throws Exception { return t1 * 2; } }
You can download the code here from GitHub: https://github.com/jgperrin/net.jgp.labs.spark.git.
Usage
Usage is the same in both cases:
df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType)));
Deployment
We need to ensure that Spark knows about the library, therefore I updated both the configuration file and the environment file. You may not need to modify both of them,
SPARK_CLASSPATH=/home/jgp/.m2/repository/io/oplo/commons/spark/0.0.1/spark-0.0.1.jar
spark.driver.extraJavaOptions /home/jgp/.m2/repository/io/oplo/commons/spark/0.0.1/spark-0.0.1.jar spark.executor.extraJavaOptions /home/jgp/.m2/repository/io/oplo/commons/spark/0.0.1/spark-0.0.1.jar
Conclusion
I recommend early tests using internal classes. When your code matures, isolate them in a separate project/JAR, which will then have its own lifecycle and, eventually, a different team/engineer/data scientist to maintain it.
Comments are closed.