Here are a few quick recipes to solve some common issues with Apache Spark. All examples are based on Java 8 (although I do not use consciously any of the version 8 features) and Spark v1.6.2 and Spark v2.0.0. The examples (and more) are clonable from GitHub : https://github.com/jgperrin/net.jgp.labs.spark.git.
Build a DataFrame from a Text File
Spark v2.0.0
package net.jgp.labs.spark; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class TextFileToDataFrame { public static void main(String[] args) { System.out.println("Working directory = " + System.getProperty("user.dir")); TextFileToDataFrame app = new TextFileToDataFrame(); app.start(); } private void start() { SparkSession spark = SparkSession.builder() .appName("DataFrame from Text File") .master("local[*]") .getOrCreate(); String filename = "data/simple-data-file.txt"; Dataset<Row> df = spark.read().text(filename); df.show(); } }
Spark v1.6.2
package net.jgp.labs.spark; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; public class FirstTextFile { public static void main(String[] args) { System.out.println("Working directory = " + System.getProperty("user.dir")); FirstTextFile app = new FirstTextFile(); app.start(); } private void start() { SparkConf conf = new SparkConf().setAppName("DataFrame from Text File").setMaster("local"); SparkContext sc = new SparkContext(conf); SQLContext sqlContext = new SQLContext(sc); String filename = "data/simple-data-file.txt"; DataFrame df = sqlContext.read().text(filename); df.show(); } }
Line 11 dumps the current directory, always useful if you lose your files (it never happens to me).
Output
And the output is:
Working Directory = /Users/jgp/git/net.jgp.labs.spark +-----+ |value| +-----+ | 1| | 2| | 3| | 4| | 5| | 6| | 7| +-----+
Given that data/simple-data-file.txt contains simply a list of integers from 1 to 7.
Connect Locally
Spark v2.0.0
package net.jgp.labs.spark; import org.apache.spark.sql.SparkSession; public class ConnectLocally { public static void main(String[] args) { SparkSession spark = SparkSession.builder().appName("Hello Spark").master("local").getOrCreate(); System.out.println("Hello, Spark v." + spark.version()); } }
The output is:
Hello, Spark v.2.0.0
Spark v1.6.2
package net.jgp.labs.spark; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; public class HelloSpark { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Hello Spark").setMaster("local"); SparkContext sc = new SparkContext(conf); System.out.println("Hello, Spark v." + sc.version()); } }
Connect Remotely
Example on how to connect to a remote Spark server/cluster to see if it is live.
Spark v2.0.0
package net.jgp.labs.spark; import org.apache.spark.sql.SparkSession; public class ConnectRemotely { public static void main(String[] args) { SparkSession spark = SparkSession.builder().appName("myApp").master("spark://10.0.100.120:7077").getOrCreate(); System.out.println("Hello, Spark v." + spark.version()); } }
Spark v1.6.2
package net.jgp.labs.spark; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; public class ConnectRemotely { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("myApp").setMaster("spark://10.0.100.120:7077"); SparkContext sc = new SparkContext(conf); System.out.println("Hello, Remote Spark v." + sc.version()); } }
Reading a CSV file in a Dataset
Read a CSV file composed of tuples and add them to a Dataset. This example is using the new Apache Spark v2.0.0 feature for reading CSV.
Spark v2.0.0
package net.jgp.labs.spark; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class CsvToDataset { public static void main(String[] args) { System.out.println("Working directory = " + System.getProperty("user.dir")); CsvToDataset app = new CsvToDataset(); app.start(); } private void start() { SparkSession spark = SparkSession.builder().appName("CSV to Dataset").master("local").getOrCreate(); String filename = "data/tuple-data-file.csv"; Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true") .option("header", "false").load(filename); df.show(); } }
Output
+---+---+ |_c0|_c1| +---+---+ | 1| 15| | 2| 25| | 3| 35| | 4| 45| | 5| 55| | 6| 65| | 7| 75| +---+---+
Migration from Databricks CSV parser
Note: using the Databricks version of the CSV parser, columns would be named C0, C1, and so on. Now, using the standard CSV parser, columns are called _c0, _c1…
You can add the following code before df.show() to turn your v2.0.0 code to be compatible with your v1.6.2.
int count = df.columns().length; for (int i = 0; i < count; i++) { String oldColName = "_c" + i; String newColName = "C" + i; df = df.withColumn(newColName, df.col(oldColName)).drop(oldColName); }
Output:
+---+---+ | C0| C1| +---+---+ | 1| 15| | 2| 25| | 3| 35| | 4| 45| | 5| 55| | 6| 65| | 7| 75| +---+---+
Register and Call an Internal UDF (User Defined Function)
This basic UDF multiplies the value of an integer column by 2.
Spark v2.0.0
package net.jgp.labs.spark; import static org.apache.spark.sql.functions.callUDF; import java.io.Serializable; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.types.DataTypes; public class BasicUdfFromTextFile implements Serializable { private static final long serialVersionUID = 3492970200940899011L; public static void main(String[] args) { System.out.println("Working directory = " + System.getProperty("user.dir")); BasicUdfFromTextFile app = new BasicUdfFromTextFile(); app.start(); } private void start() { SparkSession spark = SparkSession.builder().appName("CSV to Dataset").master("local").getOrCreate(); spark.udf().register("x2Multiplier", new UDF1<Integer, Integer>() { private static final long serialVersionUID = -5372447039252716846L; @Override public Integer call(Integer x) { return x * 2; } }, DataTypes.IntegerType); String filename = "data/tuple-data-file.csv"; Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true") .option("header", "false").load(filename); df = df.withColumn("label", df.col("_c0")).drop("_c0"); df = df.withColumn("value", df.col("_c1")).drop("_c1"); df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType))); df.show(); } }
Spark v1.6.2
package net.jgp.labs.spark; import static org.apache.spark.sql.functions.callUDF; import java.io.Serializable; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.types.DataTypes; public class BasicUdfFromTextFile implements Serializable { private static final long serialVersionUID = 3492970200940899011L; public static void main(String[] args) { BasicUdfFromTextFile app = new BasicUdfFromTextFile(); app.start(); } private void start() { SparkConf conf = new SparkConf().setAppName("Basic UDF from Text File").setMaster("local"); SparkContext sc = new SparkContext(conf); SQLContext sqlContext = new SQLContext(sc); sqlContext.udf().register("x2Multiplier", new UDF1&lt;Integer, Integer&gt;() { private static final long serialVersionUID = -5372447039252716846L; @Override public Integer call(Integer x) { return x * 2; } }, DataTypes.IntegerType); String filename = "data/tuple-data-file.csv"; DataFrame df = sqlContext.read().format("com.databricks.spark.csv").option("inferSchema", "true") .option("header", "false").load(filename); df = df.withColumn("label", df.col("C0")).drop("C0"); df = df.withColumn("value", df.col("C1")).drop("C1"); df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType))); df.show(); } }
Output
+-----+-----+---+ |label|value| x2| +-----+-----+---+ | 1| 15| 30| | 2| 25| 50| | 3| 35| 70| | 4| 45| 90| | 5| 55|110| | 6| 65|130| | 7| 75|150| +-----+-----+---+
Register and Call an External UDF (User Defined Function)
This basic UDF multiplies the value of an integer column by 2. This example uses 2 files.
Spark v2.0.0
package net.jgp.labs.spark; import static org.apache.spark.sql.functions.callUDF; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import net.jgp.labs.spark.udf.Multiplier2; public class BasicExternalUdfFromTextFile { public static void main(String[] args) { System.out.println("Working directory = " + System.getProperty("user.dir")); BasicExternalUdfFromTextFile app = new BasicExternalUdfFromTextFile(); app.start(); } private void start() { SparkSession spark = SparkSession.builder().appName("CSV to Dataset").master("local").getOrCreate(); spark.udf().register("x2Multiplier", new Multiplier2(), DataTypes.IntegerType); String filename = "data/tuple-data-file.csv"; Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true") .option("header", "false").load(filename); df = df.withColumn("label", df.col("_c0")).drop("_c0"); df = df.withColumn("value", df.col("_c1")).drop("_c1"); df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType))); df.show(); } }
Spark v1.6.2
package net.jgp.labs.spark; import static org.apache.spark.sql.functions.callUDF; import java.io.Serializable; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.types.DataTypes; import net.jgp.labs.spark.udf.Multiplier2; public class BasicExternalUdfFromTextFile { public static void main(String[] args) { System.out.println("Working directory = " + System.getProperty("user.dir")); BasicExternalUdfFromTextFile app = new BasicExternalUdfFromTextFile(); app.start(); } private void start() { SparkConf conf = new SparkConf().setAppName("Basic UDF from Text File").setMaster("local"); SparkContext sc = new SparkContext(conf); SQLContext sqlContext = new SQLContext(sc); sqlContext.udf().register("x2Multiplier", new Multiplier2(), DataTypes.IntegerType); String filename = "data/tuple-data-file.csv"; DataFrame df = sqlContext.read().format("com.databricks.spark.csv").option("inferSchema", "true") .option("header", "false").load(filename); df = df.withColumn("label", df.col("C0")).drop("C0"); df = df.withColumn("value", df.col("C1")).drop("C1"); df = df.withColumn("x2", callUDF("x2Multiplier", df.col("value").cast(DataTypes.IntegerType))); df.show(); } }
UDF Code
The UDF (User Defined Function) code is the same for both Apache Spark v1.6.2 and v2.0.0.
package net.jgp.labs.spark.udf; import org.apache.spark.sql.api.java.UDF1; public class Multiplier2 implements UDF1<Integer, Integer> { private static final long serialVersionUID = -4519338105113996424L; @Override public Integer call(Integer t1) throws Exception { return t1 * 2; } }
NC schools by school district
This example downloads a JSON file from the Open Data portal of Durham and starts some analysis.
You will need:
- The net.jgp.commons.download.DownloadManager class, which you can get from GitHub at https://github.com/jgperrin/net.jgp.commons.download.git.
- A /Pool directory, which you can easily replace by /tmp if you want.
/** * NC schools by school district analysis. */ package net.jgp.labs.spark; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import net.jgp.commons.download.DownloadManager; public class ListNCSchoolDistricts { public static void main(String[] args) { String filename = DownloadManager.getFilename( "https://opendurham.nc.gov/explore/dataset/north-carolina-school-performance-data/download/?format=json&timezone=America/New_York"); System.out.println("File " + filename + " downloaded"); SparkSession spark = SparkSession.builder().appName("NC Schools").master("local").getOrCreate(); String fileToAnalyze = "/Pool/" + filename; System.out.println("File to analyze: " + fileToAnalyze); Dataset<Row> df; df = spark.read().option("dateFormat", "yyyy-mm-dd").json(fileToAnalyze); df = df.withColumn("district", df.col("fields.district")); df = df.groupBy("district").count().orderBy(df.col("district")); df.show(150, false); } }
Partial output:
File 644ae7209488a5a34f6ecf6c6aa27178.dl downloaded File to analyze: /Pool/644ae7209488a5a34f6ecf6c6aa27178.dl +------------------------------+-----+ |district |count| +------------------------------+-----+ |Alamance-Burlington Schools |34 | |Alexander County Schools |10 | |Alleghany County Schools |4 | |Anson County Schools |10 | |Ashe County Schools |5 | |Asheboro City Schools |8 | |Asheville City Schools |8 | |Avery County Schools |10 | |Beaufort County Schools |12 | |Bertie County Schools |8 | |Bladen County Schools |13 | |Brunswick County Schools |18 | |Buncombe County Schools |39 | |Burke County Schools |25 | |Cabarrus County Schools |36 | |Caldwell County Schools |24 | |Camden County Schools |5 | |Carteret County Public Schools|16 | |Caswell County Schools |6 | |Catawba County Schools |27 | |Chapel Hill-Carrboro Schools |18 | ...
More to Come
More are on their way, use the comments if you have a specific one in mind and I’ll see if I can add it.
Updates
- 2016-07-22: First version.
- 2016-07-25: Added 2 UDF examples.
- 2016-07-29: Migration to Spark v2.0.0 started.
- 2016-07-29: NC School example added, parsing JSON into a Dataset.
Comments are closed.